This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 9285726facf783d41d8f8b81181070d1f356b420 Author: Bankim Bhavsar <[email protected]> AuthorDate: Fri Jan 24 11:56:49 2020 -0800 [util] Add cloning support to BlockBloomFilter and associated allocator Cloning of predicate data is required in the client, hence this change. Change-Id: I74b24ce0959930a3ee7d5df77874e42be0b50b41 Reviewed-on: http://gerrit.cloudera.org:8080/15121 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Adar Dembo <[email protected]> --- src/kudu/util/block_bloom_filter-test.cc | 30 +++++++++++++++--- src/kudu/util/block_bloom_filter.cc | 52 ++++++++++++++++++++++++++++++-- src/kudu/util/block_bloom_filter.h | 45 ++++++++++++++++++++++----- 3 files changed, 113 insertions(+), 14 deletions(-) diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc index 8af5cc8..978b2ad 100644 --- a/src/kudu/util/block_bloom_filter-test.cc +++ b/src/kudu/util/block_bloom_filter-test.cc @@ -31,6 +31,7 @@ #include <gtest/gtest.h> #include "kudu/util/hash.pb.h" +#include "kudu/util/memory/arena.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -45,6 +46,7 @@ class BlockBloomFilterTest : public KuduTest { public: void SetUp() override { SeedRandom(); + allocator_ = DefaultBlockBloomFilterBufferAllocator::GetSingleton(); } // Make a random uint32_t, avoiding the absent high bit and the low-entropy low bits // produced by rand(). @@ -58,8 +60,7 @@ class BlockBloomFilterTest : public KuduTest { BlockBloomFilter* CreateBloomFilter(size_t log_space_bytes) { FLAGS_disable_blockbloomfilter_avx2 = (MakeRand() & 0x1) == 0; - unique_ptr<BlockBloomFilter> bf( - new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton())); + unique_ptr<BlockBloomFilter> bf(new BlockBloomFilter(allocator_)); CHECK_OK(bf->Init(log_space_bytes, FAST_HASH, 0)); bloom_filters_.emplace_back(move(bf)); return bloom_filters_.back().get(); @@ -71,6 +72,9 @@ class BlockBloomFilterTest : public KuduTest { } } + protected: + DefaultBlockBloomFilterBufferAllocator* allocator_; + private: vector<unique_ptr<BlockBloomFilter>> bloom_filters_; }; @@ -82,8 +86,26 @@ TEST_F(BlockBloomFilterTest, Constructor) { } } +TEST_F(BlockBloomFilterTest, Clone) { + Arena arena(1024); + ArenaBlockBloomFilterBufferAllocator arena_allocator(&arena); + std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone = arena_allocator.Clone(); + + for (int log_space_bytes = 1; log_space_bytes <= 20; ++log_space_bytes) { + auto* bf = CreateBloomFilter(log_space_bytes); + int max_elems = BlockBloomFilter::MaxNdv(log_space_bytes, 0.01 /* fpp */); + while (max_elems-- > 0) { + bf->Insert(MakeRand()); + } + unique_ptr<BlockBloomFilter> bf_clone; + ASSERT_OK(bf->Clone(allocator_clone.get(), &bf_clone)); + ASSERT_NE(nullptr, bf_clone); + ASSERT_EQ(*bf_clone, *bf); + } +} + TEST_F(BlockBloomFilterTest, InvalidSpace) { - BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); + BlockBloomFilter bf(allocator_); // Random number in the range [38, 64). const int log_space_bytes = 38 + rand() % (64 - 38); Status s = bf.Init(log_space_bytes, FAST_HASH, 0); @@ -93,7 +115,7 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) { } TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) { - BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); + BlockBloomFilter bf(allocator_); Status s = bf.Init(4, UNKNOWN_HASH, 0); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_STR_CONTAINS(s.ToString(), "Invalid/Unsupported hash algorithm"); diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc index 90b0a3d..572119b 100644 --- a/src/kudu/util/block_bloom_filter.cc +++ b/src/kudu/util/block_bloom_filter.cc @@ -28,12 +28,13 @@ #include <gflags/gflags.h> -#include "kudu/gutil/singleton.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/util/block_bloom_filter.pb.h" #include "kudu/util/flag_tags.h" #include "kudu/util/memory/arena.h" +using std::shared_ptr; +using std::unique_ptr; using strings::Substitute; DEFINE_bool(disable_blockbloomfilter_avx2, false, @@ -131,6 +132,20 @@ Status BlockBloomFilter::InitFromPB(const BlockBloomFilterPB& bf_src) { return Status::OK(); } +Status BlockBloomFilter::Clone(BlockBloomFilterBufferAllocatorIf* allocator, + unique_ptr<BlockBloomFilter>* bf_out) const { + unique_ptr<BlockBloomFilter> bf_clone(new BlockBloomFilter(allocator)); + + RETURN_NOT_OK(bf_clone->InitInternal(log_space_bytes(), hash_algorithm_, hash_seed_)); + DCHECK_NOTNULL(bf_clone->directory_); + CHECK_EQ(bf_clone->directory_size(), directory_size()); + memcpy(bf_clone->directory_, directory_, bf_clone->directory_size()); + bf_clone->always_false_ = always_false_; + + *bf_out = std::move(bf_clone); + return Status::OK(); +} + void BlockBloomFilter::Close() { if (directory_ != nullptr) { buffer_allocator_->FreeBuffer(directory_); @@ -244,6 +259,25 @@ bool BlockBloomFilter::operator!=(const BlockBloomFilter& rhs) const { return !(rhs == *this); } +shared_ptr<DefaultBlockBloomFilterBufferAllocator> + DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr() { + // Meyer's Singleton. + // Static variable initialization is thread-safe in C++11. + static shared_ptr<DefaultBlockBloomFilterBufferAllocator> instance = + DefaultBlockBloomFilterBufferAllocator::make_shared(); + + return instance; +} + +DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator::GetSingleton() { + return GetSingletonSharedPtr().get(); +} + +shared_ptr<BlockBloomFilterBufferAllocatorIf> + DefaultBlockBloomFilterBufferAllocator::Clone() const { + return GetSingletonSharedPtr(); +} + Status DefaultBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) { int ret_code = posix_memalign(ptr, CACHELINE_SIZE, bytes); return ret_code == 0 ? Status::OK() : @@ -254,8 +288,20 @@ void DefaultBlockBloomFilterBufferAllocator::FreeBuffer(void* ptr) { free(DCHECK_NOTNULL(ptr)); } -DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator::GetSingleton() { - return Singleton<DefaultBlockBloomFilterBufferAllocator>::get(); +ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator(Arena* arena) : + arena_(DCHECK_NOTNULL(arena)), + is_arena_owned_(false) { +} + +ArenaBlockBloomFilterBufferAllocator::ArenaBlockBloomFilterBufferAllocator() : + arena_(new Arena(1024)), + is_arena_owned_(true) { +} + +ArenaBlockBloomFilterBufferAllocator::~ArenaBlockBloomFilterBufferAllocator() { + if (is_arena_owned_) { + delete arena_; + } } Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) { diff --git a/src/kudu/util/block_bloom_filter.h b/src/kudu/util/block_bloom_filter.h index 9b5f9be..a218dec 100644 --- a/src/kudu/util/block_bloom_filter.h +++ b/src/kudu/util/block_bloom_filter.h @@ -21,6 +21,7 @@ #include <cstddef> #include <cstdint> #include <limits> +#include <memory> #include <gflags/gflags_declare.h> #include <glog/logging.h> @@ -30,6 +31,7 @@ #include "kudu/gutil/port.h" #include "kudu/util/hash.pb.h" #include "kudu/util/hash_util.h" +#include "kudu/util/make_shared.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -89,6 +91,14 @@ class BlockBloomFilter { Status Init(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed); // Initialize the BlockBloomFilter by de-serializing the protobuf message. Status InitFromPB(const BlockBloomFilterPB& bf_src); + + // Clones the BlockBloomFilter using the supplied "allocator". The allocator is expected + // to remain valid during the lifetime of the cloned BlockBloomFilter. + // On success, returns Status::OK with cloned BlockBloomFilter in "bf_out" output parameter. + // On failure, returns error status. + Status Clone(BlockBloomFilterBufferAllocatorIf* allocator, + std::unique_ptr<BlockBloomFilter>* bf_out) const; + void Close(); // Adds an element to the BloomFilter. The function used to generate 'hash' need not @@ -253,28 +263,42 @@ class BlockBloomFilter { // Generic interface to allocate and de-allocate memory for the BlockBloomFilter. class BlockBloomFilterBufferAllocatorIf { public: + virtual ~BlockBloomFilterBufferAllocatorIf() = default; virtual Status AllocateBuffer(size_t bytes, void** ptr) = 0; virtual void FreeBuffer(void* ptr) = 0; + // Clones the allocator. + virtual std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const = 0; }; -class DefaultBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf { +// Default allocator implemented as Singleton. +class DefaultBlockBloomFilterBufferAllocator : + public BlockBloomFilterBufferAllocatorIf, + public enable_make_shared<DefaultBlockBloomFilterBufferAllocator> { public: - // Required for Singleton. - DefaultBlockBloomFilterBufferAllocator() = default; + static std::shared_ptr<DefaultBlockBloomFilterBufferAllocator> GetSingletonSharedPtr(); + static DefaultBlockBloomFilterBufferAllocator* GetSingleton(); + ~DefaultBlockBloomFilterBufferAllocator() override = default; Status AllocateBuffer(size_t bytes, void** ptr) override; void FreeBuffer(void* ptr) override; + std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override; - static DefaultBlockBloomFilterBufferAllocator* GetSingleton(); + protected: + // Protected default constructor to allow using make_shared() + DefaultBlockBloomFilterBufferAllocator() = default; private: DISALLOW_COPY_AND_ASSIGN(DefaultBlockBloomFilterBufferAllocator); }; class ArenaBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf { public: - // Arena is expected to remain valid during the lifetime of the allocator. - explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena) : arena_(arena) {} - ArenaBlockBloomFilterBufferAllocator() : arena_(nullptr) {} + // Default constructor with arena that's owned by the allocator. + ArenaBlockBloomFilterBufferAllocator(); + + // "arena" is expected to remain valid during the lifetime of the allocator. + explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena); + + ~ArenaBlockBloomFilterBufferAllocator() override; Status AllocateBuffer(size_t bytes, void** ptr) override; @@ -282,8 +306,15 @@ class ArenaBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAlloca // NOP. Buffer will be de-allocated when the arena is destructed. } + std::shared_ptr<BlockBloomFilterBufferAllocatorIf> Clone() const override { + return std::make_shared<ArenaBlockBloomFilterBufferAllocator>(); + }; + private: Arena* arena_; + bool is_arena_owned_; + + DISALLOW_COPY_AND_ASSIGN(ArenaBlockBloomFilterBufferAllocator); }; } // namespace kudu
