This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9285726facf783d41d8f8b81181070d1f356b420
Author: Bankim Bhavsar <[email protected]>
AuthorDate: Fri Jan 24 11:56:49 2020 -0800

    [util] Add cloning support to BlockBloomFilter and associated allocator
    
    Cloning of predicate data is required in the client, hence this change.
    
    Change-Id: I74b24ce0959930a3ee7d5df77874e42be0b50b41
    Reviewed-on: http://gerrit.cloudera.org:8080/15121
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Adar Dembo <[email protected]>
---
 src/kudu/util/block_bloom_filter-test.cc | 30 +++++++++++++++---
 src/kudu/util/block_bloom_filter.cc      | 52 ++++++++++++++++++++++++++++++--
 src/kudu/util/block_bloom_filter.h       | 45 ++++++++++++++++++++++-----
 3 files changed, 113 insertions(+), 14 deletions(-)

diff --git a/src/kudu/util/block_bloom_filter-test.cc 
b/src/kudu/util/block_bloom_filter-test.cc
index 8af5cc8..978b2ad 100644
--- a/src/kudu/util/block_bloom_filter-test.cc
+++ b/src/kudu/util/block_bloom_filter-test.cc
@@ -31,6 +31,7 @@
 #include <gtest/gtest.h>
 
 #include "kudu/util/hash.pb.h"
+#include "kudu/util/memory/arena.h"
 #include "kudu/util/status.h"
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
@@ -45,6 +46,7 @@ class BlockBloomFilterTest : public KuduTest {
  public:
   void SetUp() override {
     SeedRandom();
+    allocator_ = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
   }
   // Make a random uint32_t, avoiding the absent high bit and the low-entropy 
low bits
   // produced by rand().
@@ -58,8 +60,7 @@ class BlockBloomFilterTest : public KuduTest {
   BlockBloomFilter* CreateBloomFilter(size_t log_space_bytes) {
     FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0;
 
-    unique_ptr<BlockBloomFilter> bf(
-        new 
BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
+    unique_ptr<BlockBloomFilter> bf(new BlockBloomFilter(allocator_));
     CHECK_OK(bf->Init(log_space_bytes, FAST_HASH, 0));
     bloom_filters_.emplace_back(move(bf));
     return bloom_filters_.back().get();
@@ -71,6 +72,9 @@ class BlockBloomFilterTest : public KuduTest {
     }
   }
 
+ protected:
+  DefaultBlockBloomFilterBufferAllocator* allocator_;
+
  private:
   vector<unique_ptr<BlockBloomFilter>> bloom_filters_;
 };
@@ -82,8 +86,26 @@ TEST_F(BlockBloomFilterTest, Constructor) {
   }
 }
 
+TEST_F(BlockBloomFilterTest, Clone) {
+  Arena arena(1024);
+  ArenaBlockBloomFilterBufferAllocator arena_allocator(&arena);
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone = 
arena_allocator.Clone();
+
+  for (int log_space_bytes = 1; log_space_bytes <= 20; ++log_space_bytes) {
+    auto* bf = CreateBloomFilter(log_space_bytes);
+    int max_elems = BlockBloomFilter::MaxNdv(log_space_bytes, 0.01 /* fpp */);
+    while (max_elems-- > 0) {
+      bf->Insert(MakeRand());
+    }
+    unique_ptr<BlockBloomFilter> bf_clone;
+    ASSERT_OK(bf->Clone(allocator_clone.get(), &bf_clone));
+    ASSERT_NE(nullptr, bf_clone);
+    ASSERT_EQ(*bf_clone, *bf);
+  }
+}
+
 TEST_F(BlockBloomFilterTest, InvalidSpace) {
-  BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+  BlockBloomFilter bf(allocator_);
   // Random number in the range [38, 64).
   const int log_space_bytes = 38 + rand() % (64 - 38);
   Status s = bf.Init(log_space_bytes, FAST_HASH, 0);
@@ -93,7 +115,7 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) {
 }
 
 TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) {
-  BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton());
+  BlockBloomFilter bf(allocator_);
   Status s = bf.Init(4, UNKNOWN_HASH, 0);
   ASSERT_TRUE(s.IsInvalidArgument());
   ASSERT_STR_CONTAINS(s.ToString(), "Invalid/Unsupported hash algorithm");
diff --git a/src/kudu/util/block_bloom_filter.cc 
b/src/kudu/util/block_bloom_filter.cc
index 90b0a3d..572119b 100644
--- a/src/kudu/util/block_bloom_filter.cc
+++ b/src/kudu/util/block_bloom_filter.cc
@@ -28,12 +28,13 @@
 
 #include <gflags/gflags.h>
 
-#include "kudu/gutil/singleton.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/block_bloom_filter.pb.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/memory/arena.h"
 
+using std::shared_ptr;
+using std::unique_ptr;
 using strings::Substitute;
 
 DEFINE_bool(disable_blockbloomfilter_avx2, false,
@@ -131,6 +132,20 @@ Status BlockBloomFilter::InitFromPB(const 
BlockBloomFilterPB& bf_src) {
   return Status::OK();
 }
 
+Status BlockBloomFilter::Clone(BlockBloomFilterBufferAllocatorIf* allocator,
+                               unique_ptr<BlockBloomFilter>* bf_out) const {
+  unique_ptr<BlockBloomFilter> bf_clone(new BlockBloomFilter(allocator));
+
+  RETURN_NOT_OK(bf_clone->InitInternal(log_space_bytes(), hash_algorithm_, 
hash_seed_));
+  DCHECK_NOTNULL(bf_clone->directory_);
+  CHECK_EQ(bf_clone->directory_size(), directory_size());
+  memcpy(bf_clone->directory_, directory_, bf_clone->directory_size());
+  bf_clone->always_false_ = always_false_;
+
+  *bf_out = std::move(bf_clone);
+  return Status::OK();
+}
+
 void BlockBloomFilter::Close() {
   if (directory_ != nullptr) {
     buffer_allocator_->FreeBuffer(directory_);
@@ -244,6 +259,25 @@ bool BlockBloomFilter::operator!=(const BlockBloomFilter& 
rhs) const {
   return !(rhs == *this);
 }
 
+shared_ptr<DefaultBlockBloomFilterBufferAllocator>
+    DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr() {
+  // Meyer's Singleton.
+  // Static variable initialization is thread-safe in C++11.
+  static shared_ptr<DefaultBlockBloomFilterBufferAllocator> instance =
+      DefaultBlockBloomFilterBufferAllocator::make_shared();
+
+  return instance;
+}
+
+DefaultBlockBloomFilterBufferAllocator* 
DefaultBlockBloomFilterBufferAllocator::GetSingleton() {
+  return GetSingletonSharedPtr().get();
+}
+
+shared_ptr<BlockBloomFilterBufferAllocatorIf>
+    DefaultBlockBloomFilterBufferAllocator::Clone() const {
+  return GetSingletonSharedPtr();
+}
+
 Status DefaultBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, 
void** ptr) {
   int ret_code = posix_memalign(ptr, CACHELINE_SIZE, bytes);
   return ret_code == 0 ? Status::OK() :
@@ -254,8 +288,20 @@ void 
DefaultBlockBloomFilterBufferAllocator::FreeBuffer(void* ptr) {
   free(DCHECK_NOTNULL(ptr));
 }
 
-DefaultBlockBloomFilterBufferAllocator* 
DefaultBlockBloomFilterBufferAllocator::GetSingleton() {
-  return Singleton<DefaultBlockBloomFilterBufferAllocator>::get();
+ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator(Arena*
 arena) :
+  arena_(DCHECK_NOTNULL(arena)),
+  is_arena_owned_(false) {
+}
+
+ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator() :
+  arena_(new Arena(1024)),
+  is_arena_owned_(true) {
+}
+
+ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() {
+  if (is_arena_owned_) {
+    delete arena_;
+  }
 }
 
 Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, 
void** ptr) {
diff --git a/src/kudu/util/block_bloom_filter.h 
b/src/kudu/util/block_bloom_filter.h
index 9b5f9be..a218dec 100644
--- a/src/kudu/util/block_bloom_filter.h
+++ b/src/kudu/util/block_bloom_filter.h
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <limits>
+#include <memory>
 
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
@@ -30,6 +31,7 @@
 #include "kudu/gutil/port.h"
 #include "kudu/util/hash.pb.h"
 #include "kudu/util/hash_util.h"
+#include "kudu/util/make_shared.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 
@@ -89,6 +91,14 @@ class BlockBloomFilter {
   Status Init(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t 
hash_seed);
   // Initialize the BlockBloomFilter by de-serializing the protobuf message.
   Status InitFromPB(const BlockBloomFilterPB& bf_src);
+
+  // Clones the BlockBloomFilter using the supplied "allocator". The allocator 
is expected
+  // to remain valid during the lifetime of the cloned BlockBloomFilter.
+  // On success, returns Status::OK with cloned BlockBloomFilter in "bf_out" 
output parameter.
+  // On failure, returns error status.
+  Status Clone(BlockBloomFilterBufferAllocatorIf* allocator,
+               std::unique_ptr<BlockBloomFilter>* bf_out) const;
+
   void Close();
 
   // Adds an element to the BloomFilter. The function used to generate 'hash' 
need not
@@ -253,28 +263,42 @@ class BlockBloomFilter {
 // Generic interface to allocate and de-allocate memory for the 
BlockBloomFilter.
 class BlockBloomFilterBufferAllocatorIf {
  public:
+  virtual ~BlockBloomFilterBufferAllocatorIf() = default;
   virtual Status AllocateBuffer(size_t bytes, void** ptr) = 0;
   virtual void FreeBuffer(void* ptr) = 0;
+  // Clones the allocator.
+  virtual std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const = 0;
 };
 
-class DefaultBlockBloomFilterBufferAllocator : public 
BlockBloomFilterBufferAllocatorIf {
+// Default allocator implemented as Singleton.
+class DefaultBlockBloomFilterBufferAllocator :
+    public BlockBloomFilterBufferAllocatorIf,
+    public enable_make_shared<DefaultBlockBloomFilterBufferAllocator> {
  public:
-  // Required for Singleton.
-  DefaultBlockBloomFilterBufferAllocator() = default;
+  static std::shared_ptr<DefaultBlockBloomFilterBufferAllocator> 
GetSingletonSharedPtr();
+  static DefaultBlockBloomFilterBufferAllocator* GetSingleton();
+  ~DefaultBlockBloomFilterBufferAllocator() override = default;
 
   Status AllocateBuffer(size_t bytes, void** ptr) override;
   void FreeBuffer(void* ptr) override;
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override;
 
-  static DefaultBlockBloomFilterBufferAllocator* GetSingleton();
+ protected:
+  // Protected default constructor to allow using make_shared()
+  DefaultBlockBloomFilterBufferAllocator() = default;
  private:
   DISALLOW_COPY_AND_ASSIGN(DefaultBlockBloomFilterBufferAllocator);
 };
 
 class ArenaBlockBloomFilterBufferAllocator : public 
BlockBloomFilterBufferAllocatorIf {
  public:
-  // Arena is expected to remain valid during the lifetime of the allocator.
-  explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena) : arena_(arena) 
{}
-  ArenaBlockBloomFilterBufferAllocator() : arena_(nullptr) {}
+  // Default constructor with arena that's owned by the allocator.
+  ArenaBlockBloomFilterBufferAllocator();
+
+  // "arena" is expected to remain valid during the lifetime of the allocator.
+  explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena);
+
+  ~ArenaBlockBloomFilterBufferAllocator() override;
 
   Status AllocateBuffer(size_t bytes, void** ptr) override;
 
@@ -282,8 +306,15 @@ class ArenaBlockBloomFilterBufferAllocator : public 
BlockBloomFilterBufferAlloca
     // NOP. Buffer will be de-allocated when the arena is destructed.
   }
 
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override {
+    return std::make_shared<ArenaBlockBloomFilterBufferAllocator>();
+  };
+
  private:
   Arena* arena_;
+  bool is_arena_owned_;
+
+  DISALLOW_COPY_AND_ASSIGN(ArenaBlockBloomFilterBufferAllocator);
 };
 
 }  // namespace kudu

Reply via email to