This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3e6d6221e1149a9a068f7d3668fddee5967e196a
Author: Bankim Bhavsar <[email protected]>
AuthorDate: Wed Feb 5 12:13:35 2020 -0800

    [client] KUDU-2483 Add IN Bloom filter predicate to C++ client
    
    Instead of exposing BlockBloomFilter directly to clients,
    added a separate KuduBloomFilter class that uses PIMPL pattern
    to access the underlying BlockBloomFilter and allocator.
    
    To enable extending KuduBloomFilter in future added a builder
    class KuduBloomFilterBuilder.
    
    Change-Id: I4aa235a4c933ebce0bf3ec7fcb135098eccc4ea4
    Reviewed-on: http://gerrit.cloudera.org:8080/15122
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Kudu Jenkins
---
 src/kudu/client/CMakeLists.txt            |   2 +
 src/kudu/client/client.cc                 |  43 +++++++
 src/kudu/client/client.h                  |  39 ++++++-
 src/kudu/client/hash-internal.h           |  31 +++++
 src/kudu/client/hash.cc                   |  51 ++++++++
 src/kudu/client/hash.h                    |  33 ++++++
 src/kudu/client/predicate-test.cc         | 188 ++++++++++++++++++++++++++++--
 src/kudu/client/scan_predicate-internal.h |  56 +++++++++
 src/kudu/client/scan_predicate.cc         | 140 ++++++++++++++++++++++
 src/kudu/client/scan_predicate.h          | 106 +++++++++++++++++
 src/kudu/client/value.h                   |   1 +
 11 files changed, 679 insertions(+), 11 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index 38f8d21..ffd536a 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -36,6 +36,7 @@ set(CLIENT_SRCS
   client-internal.cc
   error_collector.cc
   error-internal.cc
+  hash.cc
   master_rpc.cc
   master_proxy_rpc.cc
   meta_cache.cc
@@ -176,6 +177,7 @@ install(TARGETS kudu_client_exported
 install(FILES
   callbacks.h
   client.h
+  hash.h
   row_result.h
   scan_batch.h
   scan_predicate.h
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index ba7fa2b..28cee2a 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -971,6 +971,49 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const 
Slice& col_name,
   });
 }
 
+KuduPredicate* KuduTable::NewInBloomFilterPredicate(
+    const Slice& col_name,
+    vector<KuduBloomFilter*>* bloom_filters,
+    KuduValue* lower_bound_inclusive,
+    KuduValue* upper_bound_exclusive) {
+  // We always take ownership of values; this ensures cleanup if the predicate 
is invalid.
+  auto cleanup = MakeScopedCleanup([&]() {
+    STLDeleteElements(bloom_filters);
+  });
+
+  unique_ptr<KuduValue> lower_bound_inclusive_uniq_ptr(lower_bound_inclusive);
+  unique_ptr<KuduValue> upper_bound_exclusive_uniq_ptr(upper_bound_exclusive);
+
+  // Empty vector of bloom filters along with no upper or lower bound will 
select
+  // all rows. Hence disallowed.
+  if (bloom_filters->empty() && !lower_bound_inclusive && 
!upper_bound_exclusive) {
+    return new KuduPredicate(
+        new ErrorPredicateData(Status::InvalidArgument("No predicates 
supplied")));
+  }
+
+  // Transfer the Bloom filter raw ptrs over to vector of unique ptrs.
+  // There is a possibility of emplace_back() throwing exception, so in such a 
case the
+  // transferred Bloom filters in unique_ptrs will be cleaned-up on exiting 
scope
+  // automatically and the non-nullptr Bloom filters in input "bloom_filters" 
vector
+  // will be cleaned up by the explicit scoped "cleanup".
+  vector<unique_ptr<KuduBloomFilter>> bloom_filters_owned;
+  bloom_filters_owned.reserve(bloom_filters->size());
+  for (auto& bf : *bloom_filters) {
+    bloom_filters_owned.emplace_back(bf);
+    bf = nullptr;
+  }
+
+  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
+    // At this point we could cancel the scoped "cleanup". But the scoped 
cleanup
+    // not only deletes pointers contained in the vector but also clears the 
vector
+    // and we want the vector be cleared as expected by the caller.
+    return new KuduPredicate(
+        new InBloomFilterPredicateData(col_schema, 
std::move(bloom_filters_owned),
+                                       
std::move(lower_bound_inclusive_uniq_ptr),
+                                       
std::move(upper_bound_exclusive_uniq_ptr)));
+  });
+}
+
 KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
                                              vector<KuduValue*>* values) {
   // We always take ownership of values; this ensures cleanup if the predicate 
is invalid.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index dd58883..90cdb8a 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -90,8 +90,6 @@ class KuduWriteOperation;
 class ResourceMetrics;
 
 namespace internal {
-template <class ReqClass, class RespClass>
-class AsyncLeaderMasterRpc; // IWYU pragma: keep
 class Batcher;
 class ErrorCollector;
 class GetTableSchemaRpc;
@@ -102,6 +100,8 @@ class RemoteTabletServer;
 class ReplicaController;
 class RetrieveAuthzTokenRpc;
 class WriteRpc;
+template <class ReqClass, class RespClass>
+class AsyncLeaderMasterRpc; // IWYU pragma: keep
 } // namespace internal
 
 /// Install a callback for internal client logging.
@@ -1086,6 +1086,41 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
                                         KuduPredicate::ComparisonOp op,
                                         KuduValue* value);
 
+  /// Create a new IN Bloom filter predicate which can be used for scanners on
+  /// this table.
+  ///
+  /// A Bloom filter is a space-efficient probabilistic data structure used to
+  /// test set membership with a possibility of false positive matches.
+  /// See @c KuduBloomFilter for creating Bloom filters.
+  ///
+  /// IN list predicate can be used with small number of values; on the other
+  /// hand with IN Bloom filter predicate large number of values can be tested
+  /// for membership in a space-efficient manner.
+  ///
+  /// @param [in] col_name
+  ///   Name of the column to which the predicate applies.
+  /// @param [in] bloom_filters
+  ///   Vector of Bloom filters that contain the values inserted to match
+  ///   against the column. The column value must match against all the
+  ///   supplied Bloom filters to be returned by the scanner. On return,
+  ///   regardless of success or error, the returned predicate will take
+  ///   ownership of the pointers contained in @c bloom_filters.
+  /// @param [in] lower_bound_inclusive
+  /// @param [in] upper_bound_exclusive
+  ///   Optional [lower_bound, upper_bound) filters where type of the bound
+  ///   must correspond to the value of the column to which the predicate is
+  ///   to be applied.
+  /// @return Raw pointer to an IN Bloom filter predicate. The caller owns the
+  ///   predicate until it is passed into KuduScanner::AddConjunctPredicate().
+  ///   In the case of an error (e.g. an invalid column name),
+  ///   a non-NULL value is still returned. The error will be returned when
+  ///   attempting to add this predicate to a KuduScanner.
+  KuduPredicate* NewInBloomFilterPredicate(
+      const Slice& col_name,
+      std::vector<KuduBloomFilter*>* bloom_filters,
+      KuduValue* lower_bound_inclusive = NULL, // NOLINT(modernize-use-nullptr)
+      KuduValue* upper_bound_exclusive = NULL); // 
NOLINT(modernize-use-nullptr)
+
   /// Create a new IN list predicate which can be used for scanners on this
   /// table.
   ///
diff --git a/src/kudu/client/hash-internal.h b/src/kudu/client/hash-internal.h
new file mode 100644
index 0000000..9869114
--- /dev/null
+++ b/src/kudu/client/hash-internal.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/client/hash.h"
+#include "kudu/util/hash.pb.h"
+
+namespace kudu {
+namespace client {
+
+// Helper functions to convert hash algorithm between client-facing and 
internal PB enums.
+kudu::HashAlgorithm ToInternalHashAlgorithm(HashAlgorithm hash_algorithm);
+HashAlgorithm FromInternalHashAlgorithm(kudu::HashAlgorithm hash_algorithm);
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/hash.cc b/src/kudu/client/hash.cc
new file mode 100644
index 0000000..7cccfe4
--- /dev/null
+++ b/src/kudu/client/hash.cc
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/hash.h"
+
+#include <ostream>
+
+#include <glog/logging.h>
+
+#include "kudu/client/hash-internal.h"
+#include "kudu/util/hash.pb.h"
+
+namespace kudu {
+namespace client {
+
+kudu::HashAlgorithm ToInternalHashAlgorithm(HashAlgorithm hash_algorithm) {
+  switch (hash_algorithm) {
+    case UNKNOWN_HASH: return kudu::UNKNOWN_HASH;
+    case MURMUR_HASH_2: return kudu::MURMUR_HASH_2;
+    case CITY_HASH: return kudu::CITY_HASH;
+    case FAST_HASH: return kudu::FAST_HASH;
+    default: LOG(FATAL)  << "Unexpected hash algorithm: " << hash_algorithm;
+  }
+}
+
+HashAlgorithm FromInternalHashAlgorithm(kudu::HashAlgorithm hash_algorithm) {
+  switch (hash_algorithm) {
+    case kudu::UNKNOWN_HASH: return UNKNOWN_HASH;
+    case kudu::MURMUR_HASH_2: return MURMUR_HASH_2;
+    case kudu::CITY_HASH: return CITY_HASH;
+    case kudu::FAST_HASH: return FAST_HASH;
+    default: LOG(FATAL)  << "Unexpected hash algorithm: " << hash_algorithm;
+  }
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/hash.h b/src/kudu/client/hash.h
new file mode 100644
index 0000000..2fb0926
--- /dev/null
+++ b/src/kudu/client/hash.h
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CLIENT_HASH_H
+#define KUDU_CLIENT_HASH_H
+
+namespace kudu {
+namespace client {
+
+/// @brief: Hash algorithm types.
+enum HashAlgorithm {
+  UNKNOWN_HASH = 0,
+  MURMUR_HASH_2 = 1,
+  CITY_HASH = 2,
+  FAST_HASH = 3
+};
+
+} // namespace client
+} // namespace kudu
+#endif // KUDU_CLIENT_HASH_H
diff --git a/src/kudu/client/predicate-test.cc 
b/src/kudu/client/predicate-test.cc
index 0b85d7a..78138c1 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -16,12 +16,16 @@
 // under the License.
 
 #include <algorithm>
+#include <cmath>
 #include <cstdint>
 #include <initializer_list>
+#include <iterator>
 #include <limits>
 #include <memory>
 #include <ostream>
 #include <string>
+#include <type_traits>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -37,11 +41,15 @@
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/integral_types.h"
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/util/decimal_util.h"
 #include "kudu/util/int128.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -50,6 +58,7 @@ using std::count_if;
 using std::numeric_limits;
 using std::string;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 namespace kudu {
@@ -62,6 +71,7 @@ using sp::shared_ptr;
 class PredicateTest : public KuduTest {
 
  protected:
+  static constexpr float kBloomFilterFalsePositiveProb = 0.01;
 
   void SetUp() override {
     // Set up the mini cluster
@@ -154,12 +164,29 @@ class PredicateTest : public KuduTest {
     for (const T& v : values) {
       if (std::any_of(test_values.begin(), test_values.end(),
                       [&] (const T& t) { return t == v; })) {
-        count += 1;
+        count++;
       }
     }
     return count;
   }
 
+  // This function helps distinguish floating point values like -0.0 against 
0.0.
+  template<typename F>
+  int CountMatchedFloatingPointRowsWithSignBit(const vector<F>& values,
+                                               const vector<F>& test_values) {
+    static_assert(std::is_floating_point<F>::value,
+                  "This function must only be used with floating point data 
types");
+    int count = 0;
+    for (const F& v : values) {
+      if (std::any_of(test_values.begin(), test_values.end(),
+                      [&] (const F& f) { return f == v && std::signbit(f) == 
std::signbit(v); })) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+
   // Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and
   // boundary values.
   template <typename T>
@@ -289,6 +316,25 @@ class PredicateTest : public KuduTest {
     };
   }
 
+  static KuduBloomFilter* CreateBloomFilter(
+      int nkeys,
+      float false_positive_prob = kBloomFilterFalsePositiveProb) {
+    KuduBloomFilterBuilder builder(nkeys);
+    builder.false_positive_probability(false_positive_prob);
+    KuduBloomFilter* bf;
+    CHECK_OK(builder.Build(&bf));
+    return bf;
+  }
+
+  void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
+                                   KuduBloomFilter* in_bloom_filter,
+                                   int expected_count) {
+    vector<KuduBloomFilter*> bf_vec = { in_bloom_filter };
+    auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec);
+    ASSERT_TRUE(bf_vec.empty());
+    ASSERT_EQ(expected_count, CountRows(table, { bf_predicate }));
+  }
+
   // Check integer predicates against the specified table. The table must have
   // key/value rows with values from CreateIntValues, plus one null value.
   template <typename T>
@@ -379,18 +425,22 @@ class PredicateTest : public KuduTest {
       }
     }
 
-    // IN list predicates
+    // IN list and IN Bloom filter predicates
     std::random_shuffle(test_values.begin(), test_values.end());
 
     for (auto end = test_values.begin(); end <= test_values.end(); end++) {
       vector<KuduValue*> vals;
+      auto* bloom_filter = 
CreateBloomFilter(std::distance(test_values.begin(), end));
 
       for (auto itr = test_values.begin(); itr != end; itr++) {
         vals.push_back(KuduValue::FromInt(*itr));
+        auto key = *itr;
+        bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
       }
 
       int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), 
end));
       ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+      CheckInBloomFilterPredicate(table, bloom_filter, count);
     }
 
     // IS NOT NULL predicate
@@ -402,7 +452,7 @@ class PredicateTest : public KuduTest {
   }
 
   // Check string predicates against the specified table.
-  void CheckStringPredicates(const shared_ptr<KuduTable>& table, 
vector<string> values) {
+  void CheckStringPredicates(const shared_ptr<KuduTable>& table, const 
vector<string>& values) {
 
     ASSERT_EQ(values.size() + 1, CountRows(table, {}));
 
@@ -494,18 +544,21 @@ class PredicateTest : public KuduTest {
       }
     }
 
-    // IN list predicates
+    // IN list and IN Bloom filter predicates
     std::random_shuffle(test_values.begin(), test_values.end());
 
     for (auto end = test_values.begin(); end <= test_values.end(); end++) {
       vector<KuduValue*> vals;
+      auto* bloom_filter = 
CreateBloomFilter(std::distance(test_values.begin(), end));
 
       for (auto itr = test_values.begin(); itr != end; itr++) {
         vals.push_back(KuduValue::CopyString(*itr));
+        bloom_filter->Insert(*itr);
       }
 
       int count = CountMatchedRows<string>(values, 
vector<string>(test_values.begin(), end));
       ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+      CheckInBloomFilterPredicate(table, bloom_filter, count);
     }
 
     // IS NOT NULL predicate
@@ -634,6 +687,44 @@ TEST_F(PredicateTest, TestBoolPredicates) {
     ASSERT_EQ(2, CountRows(table, { pred }));
   }
 
+  { // empty BloomFilter
+    auto* bloom_filter = CreateBloomFilter(0);
+    CheckInBloomFilterPredicate(table, bloom_filter, 0);
+  }
+
+  { // vector with no BloomFilters
+    vector<KuduBloomFilter*> no_bloom_filters = {};
+    auto* bf_predicate = table->NewInBloomFilterPredicate("value", 
&no_bloom_filters);
+    ASSERT_TRUE(no_bloom_filters.empty());
+    KuduScanner scanner(table.get());
+    Status s = scanner.AddConjunctPredicate(bf_predicate);
+    ASSERT_TRUE(s.IsInvalidArgument());
+    ASSERT_STR_CONTAINS(s.ToString(), "No predicates supplied");
+  }
+
+  { // BloomFilter with (true)
+    auto* bloom_filter = CreateBloomFilter(1);
+    bool key = true;
+    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
+    CheckInBloomFilterPredicate(table, bloom_filter, 1);
+  }
+
+  { // BloomFilter with (false)
+    auto* bloom_filter = CreateBloomFilter(1);
+    bool key = false;
+    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
+    CheckInBloomFilterPredicate(table, bloom_filter, 1);
+  }
+
+  { // BloomFilter with (true, false)
+    auto* bloom_filter = CreateBloomFilter(2);
+    bool key = true;
+    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
+    key = false;
+    bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
+    CheckInBloomFilterPredicate(table, bloom_filter, 2);
+  }
+
   // IS NOT NULL predicate
   ASSERT_EQ(CountRows(table, {}) - 1,
             CountRows(table, { table->NewIsNotNullPredicate("value") }));
@@ -864,18 +955,24 @@ TEST_F(PredicateTest, TestFloatPredicates) {
     }
   }
 
-  // IN list predicates
+  // IN list and IN Bloom filter predicates
   std::random_shuffle(test_values.begin(), test_values.end());
 
   for (auto end = test_values.begin(); end <= test_values.end(); end++) {
     vector<KuduValue*> vals;
+    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), 
end));
 
     for (auto itr = test_values.begin(); itr != end; itr++) {
       vals.push_back(KuduValue::FromFloat(*itr));
+      auto key = *itr;
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8*>(&key), 
sizeof(key)));
     }
 
-    int count = CountMatchedRows<float>(values, 
vector<float>(test_values.begin(), end));
+    int count = CountMatchedRows(values, vector<float>(test_values.begin(), 
end));
     ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+    int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
+        values, vector<float>(test_values.begin(), end));
+    CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
   }
 
   // IS NOT NULL predicate
@@ -988,18 +1085,24 @@ TEST_F(PredicateTest, TestDoublePredicates) {
     }
   }
 
-  // IN list predicates
+  // IN list and IN Bloom filter predicates
   std::random_shuffle(test_values.begin(), test_values.end());
 
   for (auto end = test_values.begin(); end <= test_values.end(); end++) {
     vector<KuduValue*> vals;
+    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), 
end));
 
     for (auto itr = test_values.begin(); itr != end; itr++) {
       vals.push_back(KuduValue::FromDouble(*itr));
+      auto key = *itr;
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
     }
 
-    int count = CountMatchedRows<double>(values, 
vector<double>(test_values.begin(), end));
+    int count = CountMatchedRows(values, vector<double>(test_values.begin(), 
end));
     ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+    int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit(
+        values, vector<double>(test_values.begin(), end));
+    CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit);
   }
 
   // IS NOT NULL predicate
@@ -1130,18 +1233,22 @@ TEST_F(PredicateTest, TestDecimalPredicates) {
     }
   }
 
-  // IN list predicates
+  // IN list and IN Bloom filter predicates
   std::random_shuffle(test_values.begin(), test_values.end());
 
   for (auto end = test_values.begin(); end <= test_values.end(); end++) {
     vector<KuduValue*> vals;
+    auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), 
end));
 
     for (auto itr = test_values.begin(); itr != end; itr++) {
       vals.push_back(KuduValue::FromDecimal(*itr, 2));
+      auto key = *itr;
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), 
sizeof(key)));
     }
 
     int count = CountMatchedRows<int128_t>(values, 
vector<int128_t>(test_values.begin(), end));
     ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", 
&vals) }));
+    CheckInBloomFilterPredicate(table, bloom_filter, count);
   }
 
   // IS NOT NULL predicate
@@ -1152,6 +1259,69 @@ TEST_F(PredicateTest, TestDecimalPredicates) {
   ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") }));
 }
 
+class BloomFilterPredicateTest : public PredicateTest {
+ protected:
+  template<class Collection>
+  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& 
values) {
+    auto* bloom_filter = CreateBloomFilter(values.size());
+    for (const auto& v : values) {
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), 
sizeof(v)));
+    }
+    return bloom_filter;
+  }
+};
+
+TEST_F(BloomFilterPredicateTest, TestBloomFilterPredicate) {
+  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32);
+  shared_ptr<KuduSession> session = CreateSession();
+
+  auto seed = SeedRandom();
+  Random rand(seed);
+  // Number of values to be written to the table.
+  static constexpr int kNumAllValues = 100000;
+  // Subset of values from the table that'll be inserted in BloomFilter and 
searched against
+  // all values in the table.
+  static constexpr int kNumInclusiveValues = 10000;
+  // Values that are not present in the table.
+  static constexpr int kNumExclusiveValues = 10000;
+  // Number of false positives based on the number of values that'll be 
searched against.
+  static constexpr int kFalsePositives = kNumAllValues * 
kBloomFilterFalsePositiveProb;
+
+  const unordered_set<int32_t> empty_set;
+  auto all_values = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, 
empty_set, &rand);
+  vector<int32_t> inclusive_values;
+  ReservoirSample(all_values, kNumInclusiveValues, empty_set, &rand, 
&inclusive_values);
+  auto* inclusive_bf = CreateBloomFilterWithValues(inclusive_values);
+  auto exclusive_values = 
CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values,
+                                                              &rand);
+  auto* exclusive_bf = CreateBloomFilterWithValues(exclusive_values);
+
+  int i = 0;
+  for (auto value : all_values) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
+    ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
+    ASSERT_OK(session->Apply(insert.release()));
+  }
+  ASSERT_OK(session->Flush());
+
+  vector<KuduBloomFilter*> inclusive_bf_vec = { inclusive_bf };
+  auto* inclusive_predicate =
+      table->NewInBloomFilterPredicate("value", &inclusive_bf_vec);
+  ASSERT_TRUE(inclusive_bf_vec.empty());
+  int actual_count_inclusive = CountRows(table, { inclusive_predicate });
+  EXPECT_LE(inclusive_values.size(), actual_count_inclusive);
+  EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_inclusive);
+
+  vector<KuduBloomFilter*> exclusive_bf_vec = { exclusive_bf };
+  auto* exclusive_predicate =
+      table->NewInBloomFilterPredicate("value", &exclusive_bf_vec);
+  ASSERT_TRUE(exclusive_bf_vec.empty());
+  int actual_count_exclusive = CountRows(table, { exclusive_predicate });
+  EXPECT_LE(0, actual_count_exclusive);
+  EXPECT_GE(kFalsePositives, actual_count_exclusive);
+}
+
 class ParameterizedPredicateTest : public PredicateTest,
   public ::testing::WithParamInterface<KuduColumnSchema::DataType> {};
 
diff --git a/src/kudu/client/scan_predicate-internal.h 
b/src/kudu/client/scan_predicate-internal.h
index acca88f..41359f1 100644
--- a/src/kudu/client/scan_predicate-internal.h
+++ b/src/kudu/client/scan_predicate-internal.h
@@ -91,6 +91,32 @@ class ComparisonPredicateData : public KuduPredicate::Data {
   gscoped_ptr<KuduValue> val_;
 };
 
+// An InBloomFilter predicate for selecting values present in the vector of 
Bloom filters.
+class InBloomFilterPredicateData : public KuduPredicate::Data {
+ public:
+  InBloomFilterPredicateData(ColumnSchema col,
+                             std::vector<std::unique_ptr<KuduBloomFilter>> 
bloom_filters,
+                             std::unique_ptr<KuduValue> lower = nullptr,
+                             std::unique_ptr<KuduValue> upper = nullptr)
+      : col_(std::move(col)),
+        bloom_filters_(std::move(bloom_filters)),
+        lower_(std::move(lower)),
+        upper_(std::move(upper)) {
+  }
+
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
+
+  InBloomFilterPredicateData* Clone() const override;
+
+ private:
+  Status CheckTypeAndGetPointer(KuduValue* val_in, void** val_out) const;
+
+  ColumnSchema col_;
+  std::vector<std::unique_ptr<KuduBloomFilter>> bloom_filters_;
+  std::unique_ptr<KuduValue> lower_;
+  std::unique_ptr<KuduValue> upper_;
+};
+
 // A list predicate for a column and a list of constant values.
 class InListPredicateData : public KuduPredicate::Data {
  public:
@@ -161,6 +187,36 @@ class IsNullPredicateData : public KuduPredicate::Data {
   ColumnSchema col_;
 };
 
+class KuduBloomFilterBuilder::Data {
+ public:
+  Data();
+  ~Data() = default;
+
+  size_t num_keys_;
+  double false_positive_probability_;
+  HashAlgorithm hash_algorithm_;
+  uint32_t hash_seed_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Data);
+};
+
+class KuduBloomFilter::Data {
+ public:
+  Data() = default;
+  ~Data() = default;
+  std::unique_ptr<Data> Clone() const;
+
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_;
+  std::unique_ptr<BlockBloomFilter> bloom_filter_;
+
+ private:
+  Data(std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator,
+       std::unique_ptr<BlockBloomFilter> bloom_filter);
+
+  DISALLOW_COPY_AND_ASSIGN(Data);
+};
+
 } // namespace client
 } // namespace kudu
 #endif /* KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H */
diff --git a/src/kudu/client/scan_predicate.cc 
b/src/kudu/client/scan_predicate.cc
index 736d078..05657dd 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -17,11 +17,14 @@
 
 #include "kudu/client/scan_predicate.h"
 
+#include <memory>
 #include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 
+#include "kudu/client/hash-internal.h"
 #include "kudu/client/scan_predicate-internal.h"
 #include "kudu/client/value-internal.h"
 #include "kudu/client/value.h"
@@ -32,10 +35,13 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/status.h"
 
 using boost::optional;
 using std::move;
+using std::shared_ptr;
+using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
@@ -148,5 +154,139 @@ Status InListPredicateData::AddToScanSpec(ScanSpec* spec, 
Arena* /*arena*/) {
   return Status::OK();
 }
 
+Status InBloomFilterPredicateData::CheckTypeAndGetPointer(KuduValue* val_in,
+                                                          void** val_out) 
const {
+  return val_in->data_->CheckTypeAndGetPointer(col_.name(),
+                                               col_.type_info()->type(),
+                                               col_.type_attributes(),
+                                               val_out);
+}
+
+Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* 
/*arena*/) {
+  void* lower_void = nullptr;
+  if (lower_) {
+    RETURN_NOT_OK(CheckTypeAndGetPointer(lower_.get(), &lower_void));
+  }
+
+  void* upper_void = nullptr;
+  if (upper_) {
+    RETURN_NOT_OK(CheckTypeAndGetPointer(upper_.get(), &upper_void));
+  }
+
+  // Extract the BlockBloomFilters.
+  vector<BlockBloomFilter*> block_bloom_filters;
+  block_bloom_filters.reserve(bloom_filters_.size());
+  for (const auto& bf : bloom_filters_) {
+    block_bloom_filters.push_back(bf->data_->bloom_filter_.get());
+  }
+
+  spec->AddPredicate(ColumnPredicate::InBloomFilter(col_, 
std::move(block_bloom_filters),
+                                                    lower_void, upper_void));
+  return Status::OK();
+}
+
+InBloomFilterPredicateData* client::InBloomFilterPredicateData::Clone() const {
+  unique_ptr<KuduValue> lower_clone;
+  if (lower_) {
+    lower_clone.reset(lower_->Clone());
+  }
+  unique_ptr<KuduValue> upper_clone;
+  if (upper_) {
+    upper_clone.reset(upper_->Clone());
+  }
+
+  vector<unique_ptr<KuduBloomFilter>> bloom_filter_clones;
+  bloom_filter_clones.reserve(bloom_filters_.size());
+  for (const auto& bf : bloom_filters_) {
+    bloom_filter_clones.emplace_back(bf->Clone());
+  }
+
+  return new InBloomFilterPredicateData(col_, std::move(bloom_filter_clones),
+                                        std::move(lower_clone), 
std::move(upper_clone));
+}
+
+KuduBloomFilter::KuduBloomFilter()  {
+  data_ = new Data();
+}
+
+KuduBloomFilter::KuduBloomFilter(Data* other_data) :
+   data_(CHECK_NOTNULL(other_data)) {
+}
+
+KuduBloomFilter::~KuduBloomFilter() {
+  delete data_;
+}
+
+KuduBloomFilter* KuduBloomFilter::Clone() const {
+  unique_ptr<Data> data_clone = data_->Clone();
+  return new KuduBloomFilter(data_clone.release());
+}
+
+void KuduBloomFilter::Insert(const Slice& key) {
+  DCHECK_NOTNULL(data_->bloom_filter_)->Insert(key);
+}
+
+KuduBloomFilterBuilder::Data::Data() :
+    num_keys_(0),
+    false_positive_probability_(0.01),
+    hash_algorithm_(FAST_HASH),
+    hash_seed_(0) {
+}
+
+KuduBloomFilterBuilder::KuduBloomFilterBuilder(size_t num_keys) {
+  data_ = new Data;
+  data_->num_keys_ = num_keys;
+}
+
+KuduBloomFilterBuilder::~KuduBloomFilterBuilder() {
+  delete data_;
+}
+
+KuduBloomFilterBuilder& 
KuduBloomFilterBuilder::false_positive_probability(double fpp) {
+  data_->false_positive_probability_ = fpp;
+  return *this;
+}
+
+KuduBloomFilterBuilder& KuduBloomFilterBuilder::hash_algorithm(HashAlgorithm 
hash_algorithm) {
+  data_->hash_algorithm_ = hash_algorithm;
+  return *this;
+}
+
+KuduBloomFilterBuilder& KuduBloomFilterBuilder::hash_seed(uint32_t hash_seed) {
+  data_->hash_seed_ = hash_seed;
+  return *this;
+}
+
+Status KuduBloomFilterBuilder::Build(KuduBloomFilter** bloom_filter_out) {
+  unique_ptr<KuduBloomFilter> bf(new KuduBloomFilter());
+  bf->data_->allocator_ = 
DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr();
+  bf->data_->bloom_filter_ = unique_ptr<BlockBloomFilter>(
+      new BlockBloomFilter(bf->data_->allocator_.get()));
+
+  int log_space_bytes = BlockBloomFilter::MinLogSpace(data_->num_keys_,
+                                                      
data_->false_positive_probability_);
+  RETURN_NOT_OK(bf->data_->bloom_filter_->Init(
+      log_space_bytes, ToInternalHashAlgorithm(data_->hash_algorithm_), 
data_->hash_seed_));
+
+  *bloom_filter_out = bf.release();
+  return Status::OK();
+}
+
+KuduBloomFilter::Data::Data(shared_ptr<BlockBloomFilterBufferAllocatorIf> 
allocator,
+                            unique_ptr<BlockBloomFilter> bloom_filter) :
+    allocator_(std::move(allocator)),
+    bloom_filter_(std::move(bloom_filter)) {
+}
+
+unique_ptr<KuduBloomFilter::Data> KuduBloomFilter::Data::Clone() const {
+  shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone =
+      CHECK_NOTNULL(allocator_->Clone());
+  unique_ptr<BlockBloomFilter> bloom_filter_clone;
+  CHECK_OK(bloom_filter_->Clone(allocator_clone.get(), &bloom_filter_clone));
+
+  return unique_ptr<KuduBloomFilter::Data>(
+      new Data(std::move(allocator_clone), std::move(bloom_filter_clone)));
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/scan_predicate.h b/src/kudu/client/scan_predicate.h
index 70be74c..99b54e4 100644
--- a/src/kudu/client/scan_predicate.h
+++ b/src/kudu/client/scan_predicate.h
@@ -19,11 +19,20 @@
 
 #ifdef KUDU_HEADERS_NO_STUBS
 #include "kudu/gutil/macros.h"
+#include "kudu/util/slice.h"
 #else
 #include "kudu/client/stubs.h"
 #endif
 
+// NOTE: using stdint.h instead of cstdint because this file is supposed
+//       to be processed by a compiler lacking C++11 support.
+#include <stdint.h>
+
+#include <cstddef>
+
+#include "kudu/client/hash.h"
 #include "kudu/util/kudu_export.h"
+#include "kudu/util/status.h"
 
 namespace kudu {
 namespace client {
@@ -52,6 +61,7 @@ class KUDU_EXPORT KuduPredicate {
   /// The PIMPL class has to be public since it's actually just an interface,
   /// and gcc gives an error trying to derive from a private nested class.
   class KUDU_NO_EXPORT Data;
+
  private:
   friend class ComparisonPredicateData;
   friend class ErrorPredicateData;
@@ -67,6 +77,102 @@ class KUDU_EXPORT KuduPredicate {
   DISALLOW_COPY_AND_ASSIGN(KuduPredicate);
 };
 
+/// @brief Bloom filter to be used with IN Bloom filter predicate.
+///
+/// A Bloom filter is a space-efficient probabilistic data-structure used to
+/// test set membership with a possibility of false positive matches.
+///
+/// Create a new KuduBloomFilter using @c KuduBloomFilterBuilder class and
+/// populate column values that need to be scanned using
+/// @c KuduBloomFilter::Insert() function.
+///
+/// Supply the populated KuduBloomFilter to
+/// @c KuduTable::NewInBloomFilterPredicate() to create an IN Bloom filter
+/// predicate.
+class KUDU_EXPORT KuduBloomFilter {
+ public:
+  ~KuduBloomFilter();
+
+  /// Insert key to the Bloom filter.
+  ///
+  /// @param[in] key
+  ///   Column value as a @c Slice to insert into the Bloom filter.
+  void Insert(const Slice& key);
+
+ private:
+  friend class InBloomFilterPredicateData;
+  friend class KuduBloomFilterBuilder;
+
+  class KUDU_NO_EXPORT Data;
+
+  // Owned ptr as per the PIMPL pattern.
+  Data* data_;
+
+  /// Clone the Bloom filter.
+  ///
+  /// @return Raw pointer to the cloned Bloom filter. Caller owns the
+  /// Bloom filter till it's passed to
+  /// @c KuduTable::NewInBloomFilterPredicate()
+  KuduBloomFilter* Clone() const;
+
+  KuduBloomFilter();
+
+  /// Construct a Bloom filter from the KuduBloomFilter::Data pointer.
+  ///
+  /// @param other_data
+  ///   Constructed KuduBloomFilter takes ownership of the pointer.
+  explicit KuduBloomFilter(Data* other_data);
+
+  DISALLOW_COPY_AND_ASSIGN(KuduBloomFilter);
+};
+
+/// @brief Builder class to help build @c KuduBloomFilter to be used with
+/// IN Bloom filter predicate.
+class KUDU_EXPORT KuduBloomFilterBuilder {
+ public:
+  /// @param [in] num_keys
+  ///   Expected number of elements to be inserted in the Bloom filter.
+  explicit KuduBloomFilterBuilder(size_t num_keys);
+  ~KuduBloomFilterBuilder();
+
+  /// @param [in] fpp
+  ///   Desired false positive probability between 0.0 and 1.0.
+  ///   If not provided, defaults to 0.01.
+  /// @return Reference to the updated object.
+  KuduBloomFilterBuilder& false_positive_probability(double fpp);
+
+  /// @param [in] hash_algorithm
+  ///   Hash algorithm used to hash keys before inserting to the Bloom filter.
+  ///   If not provided, defaults to FAST_HASH.
+  /// @return Reference to the updated object.
+  KuduBloomFilterBuilder& hash_algorithm(HashAlgorithm hash_algorithm);
+
+  /// @param [in] hash_seed
+  ///   Seed used with hash algorithm to hash the keys before inserting to
+  ///   the Bloom filter.
+  ///   If not provided, defaults to 0.
+  /// @return Reference to the updated object.
+  KuduBloomFilterBuilder& hash_seed(uint32_t hash_seed);
+
+  /// Build a new Bloom filter to be used with IN Bloom filter predicate.
+  ///
+  /// @param [out] bloom_filter
+  ///   On success, the created Bloom filter raw pointer. Caller owns the Bloom
+  ///   filter until it's passed to @c KuduTable::NewInBloomFilterPredicate().
+  /// @return On success, Status::OK() with the created Bloom filter in
+  ///   @c bloom_filter output parameter. On failure to allocate memory or
+  ///   invalid arguments, corresponding error status.
+  Status Build(KuduBloomFilter** bloom_filter);
+
+ private:
+  class KUDU_NO_EXPORT Data;
+
+  // Owned ptr as per the PIMPL pattern.
+  Data* data_;
+
+  DISALLOW_COPY_AND_ASSIGN(KuduBloomFilterBuilder);
+};
+
 } // namespace client
 } // namespace kudu
 #endif // KUDU_CLIENT_SCAN_PREDICATE_H
diff --git a/src/kudu/client/value.h b/src/kudu/client/value.h
index 4d76fa8..62277ff 100644
--- a/src/kudu/client/value.h
+++ b/src/kudu/client/value.h
@@ -78,6 +78,7 @@ class KUDU_EXPORT KuduValue {
   ~KuduValue();
  private:
   friend class ComparisonPredicateData;
+  friend class InBloomFilterPredicateData;
   friend class InListPredicateData;
   friend class KuduColumnSpec;
 

Reply via email to