Repository: incubator-quickstep Updated Branches: refs/heads/aggregate-on-left-outer-join 6b2dbafc2 -> a28b1e4d7 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/BuildAggregationExistenceMapOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp new file mode 100644 index 0000000..e2928a8 --- /dev/null +++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_ + +#include <cstddef> + +#include <string> +#include <vector> + +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class AggregationOperationState; +class CatalogRelationSchema; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which builds a bit vector on the input relation's one + * attribute where the bit vector serves as the existence map for an + * AggregationOperationState's CollisionFreeVectorTable. + **/ +class BuildAggregationExistenceMapOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this operator belongs. + * @param input_relation The relation to build the existence map on. + * @param build_attribute The ID of the attribute to build the existence map on. + * @param input_relation_is_stored If input_relation is a stored relation and + * is fully available to the operator before it can start generating + * workorders. + * @param aggr_state_index The index of the AggregationState in QueryContext. + **/ + BuildAggregationExistenceMapOperator(const std::size_t query_id, + const CatalogRelation &input_relation, + const attribute_id build_attribute, + const bool input_relation_is_stored, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + input_relation_(input_relation), + build_attribute_(build_attribute), + input_relation_is_stored_(input_relation_is_stored), + input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot() + : std::vector<block_id>()), + aggr_state_index_(aggr_state_index), + num_workorders_generated_(0), + started_(false) {} + + ~BuildAggregationExistenceMapOperator() override {} + + std::string getName() const override { + return "BuildAggregationExistenceMapOperator"; + } + + /** + * @return The input relation. + */ + const CatalogRelation& input_relation() const { + return input_relation_; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { + input_relation_block_ids_.push_back(input_block_id); + } + + private: + serialization::WorkOrder* createWorkOrderProto(const block_id block); + + const CatalogRelation &input_relation_; + const attribute_id build_attribute_; + const bool input_relation_is_stored_; + std::vector<block_id> input_relation_block_ids_; + const QueryContext::aggregation_state_id aggr_state_index_; + + std::vector<block_id>::size_type num_workorders_generated_; + bool started_; + + DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator); +}; + +/** + * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator. + **/ +class BuildAggregationExistenceMapWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor + * + * @param query_id The ID of this query. + * @param input_relation The relation to build the existence map on. + * @param build_block_id The block id. + * @param build_attribute The ID of the attribute to build on. + * @param state The AggregationState to use. + * @param storage_manager The StorageManager to use. + **/ + BuildAggregationExistenceMapWorkOrder(const std::size_t query_id, + const CatalogRelationSchema &input_relation, + const block_id build_block_id, + const attribute_id build_attribute, + AggregationOperationState *state, + StorageManager *storage_manager) + : WorkOrder(query_id), + input_relation_(input_relation), + build_block_id_(build_block_id), + build_attribute_(build_attribute), + state_(DCHECK_NOTNULL(state)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + ~BuildAggregationExistenceMapWorkOrder() override {} + + void execute() override; + + private: + const CatalogRelationSchema &input_relation_; + const block_id build_block_id_; + const attribute_id build_attribute_; + AggregationOperationState *state_; + + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index df4114d..457d58a 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -33,6 +33,9 @@ set_gflags_lib_name () # Declare micro-libs: add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp) +add_library(quickstep_relationaloperators_BuildAggregationExistenceMapOperator + BuildAggregationExistenceMapOperator.cpp + BuildAggregationExistenceMapOperator.hpp) add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp) add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp) add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp) @@ -95,6 +98,31 @@ target_link_libraries(quickstep_relationaloperators_AggregationOperator quickstep_utility_lipfilter_LIPFilterAdaptiveProber quickstep_utility_lipfilter_LIPFilterUtil tmb) +target_link_libraries(quickstep_relationaloperators_BuildAggregationExistenceMapOperator + glog + quickstep_catalog_CatalogAttribute + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogRelationSchema + quickstep_catalog_CatalogTypedefs + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_AggregationOperationState + quickstep_storage_CollisionFreeVectorTable + quickstep_storage_StorageBlock + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager + quickstep_storage_TupleStorageSubBlock + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_types_TypeID + quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_relationaloperators_BuildHashOperator glog quickstep_catalog_CatalogRelation @@ -518,6 +546,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_BuildAggregationExistenceMapOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_DeleteOperator @@ -552,6 +581,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp) target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_BuildAggregationExistenceMapOperator quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_CreateIndexOperator http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrder.proto ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto index 76753d2..d0d0753 100644 --- a/relational_operators/WorkOrder.proto +++ b/relational_operators/WorkOrder.proto @@ -44,6 +44,7 @@ enum WorkOrderType { UPDATE = 20; WINDOW_AGGREGATION = 21; DESTROY_AGGREGATION_STATE = 22; + BUILD_AGGREGATION_EXISTENCE_MAP = 23; } message WorkOrder { @@ -278,6 +279,15 @@ message WindowAggregationWorkOrder { message DestroyAggregationStateWorkOrder { extend WorkOrder { - optional uint32 aggr_state_index = 339; + optional uint32 aggr_state_index = 352; + } +} + +message BuildAggregationExistenceMapWorkOrder { + extend WorkOrder { + optional int32 relation_id = 368; + optional fixed64 build_block_id = 369; + optional int32 build_attribute = 370; + optional uint32 aggr_state_index = 371; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index bd2a0f8..d2c8251 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -29,6 +29,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryContext.hpp" #include "relational_operators/AggregationOperator.hpp" +#include "relational_operators/BuildAggregationExistenceMapOperator.hpp" #include "relational_operators/BuildHashOperator.hpp" #include "relational_operators/BuildLIPFilterOperator.hpp" #include "relational_operators/DeleteOperator.hpp" @@ -91,6 +92,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder CreateLIPFilterAdaptiveProberHelper( proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context)); } + case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: { + LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder in Shiftboss " << shiftboss_index; + + return new BuildAggregationExistenceMapWorkOrder( + proto.query_id(), + catalog_database->getRelationSchemaById( + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)), + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id), + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute), + query_context->getAggregationState( + proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)), + storage_manager); + } case serialization::BUILD_LIP_FILTER: { LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index; @@ -525,6 +539,29 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto, query_context.isValidAggregationStateId( proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)); } + case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: { + if (!proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)) { + return false; + } + + const relation_id rel_id = + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id); + if (!catalog_database.hasRelationWithId(rel_id)) { + return false; + } + + const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id); + const attribute_id build_attribute = + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_attribute); + if (!relation.hasAttributeWithId(build_attribute)) { + return false; + } + + return proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id) && + proto.HasExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index) && + query_context.isValidAggregationStateId( + proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::aggr_state_index)); + } case serialization::BUILD_HASH: { if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) { return false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 0b34908..0f39b41 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -412,12 +412,18 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const { } } +CollisionFreeVectorTable* AggregationOperationState + ::getCollisionFreeVectorTable() const { + return static_cast<CollisionFreeVectorTable *>( + collision_free_hashtable_.get()); +} + void AggregationOperationState::initialize(const std::size_t partition_id) { if (is_aggregate_collision_free_) { static_cast<CollisionFreeVectorTable *>( collision_free_hashtable_.get())->initialize(partition_id); } else { - LOG(FATAL) << "AggregationOperationState::initializeState() " + LOG(FATAL) << "AggregationOperationState::initialize() " << "is not supported by this aggregation"; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 13ee377..c8930ee 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -41,6 +41,7 @@ namespace serialization { class AggregationOperationState; } class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; +class CollisionFreeVectorTable; class InsertDestination; class LIPFilterAdaptiveProber; class StorageManager; @@ -198,6 +199,14 @@ class AggregationOperationState { void finalizeAggregate(const std::size_t partition_id, InsertDestination *output_destination); + /** + * @brief Get the collision-free vector table used by this aggregation. + * + * @return The collision-free vector table used by this aggregation. + * Returns NULL if collision-free vector table is not used. + */ + CollisionFreeVectorTable* getCollisionFreeVectorTable() const; + private: // Check whether partitioned aggregation can be applied. bool checkAggregatePartitioned( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp index 4f3e238..772d47d 100644 --- a/storage/CollisionFreeVectorTable.hpp +++ b/storage/CollisionFreeVectorTable.hpp @@ -105,6 +105,15 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { } /** + * @brief Get the existence map for this vector table. + * + * @return The existence map for this vector table. + */ + inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const { + return existence_map_.get(); + } + + /** * @brief Initialize the specified partition of this aggregation table. * * @param partition_id ID of the partition to be initialized. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/BitVectorExactFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp index 6ad0567..48fd5e1 100644 --- a/utility/lip_filter/BitVectorExactFilter.hpp +++ b/utility/lip_filter/BitVectorExactFilter.hpp @@ -20,17 +20,16 @@ #ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_ #define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_ -#include <atomic> +#include <cstddef> #include <cstdint> -#include <cstring> #include <vector> #include "catalog/CatalogTypedefs.hpp" #include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConstants.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" #include "types/Type.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" #include "utility/Macros.hpp" #include "utility/lip_filter/LIPFilter.hpp" @@ -64,14 +63,10 @@ class BitVectorExactFilter : public LIPFilter { : LIPFilter(LIPFilterType::kBitVectorExactFilter), min_value_(static_cast<CppType>(min_value)), max_value_(static_cast<CppType>(max_value)), - bit_array_(GetByteSize(max_value - min_value + 1)) { + bit_vector_(max_value - min_value + 1) { DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_)); DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_)); DCHECK_GE(max_value_, min_value_); - - std::memset(bit_array_.data(), - 0x0, - sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1)); } void insertValueAccessor(ValueAccessor *accessor, @@ -109,13 +104,6 @@ class BitVectorExactFilter : public LIPFilter { private: /** - * @brief Round up bit_size to multiples of 8. - */ - inline static std::size_t GetByteSize(const std::size_t bit_size) { - return (bit_size + 7u) / 8u; - } - - /** * @brief Iterate through the accessor and hash values into the internal bit * array. */ @@ -164,8 +152,7 @@ class BitVectorExactFilter : public LIPFilter { DCHECK_GE(value, min_value_); DCHECK_LE(value, max_value_); - const CppType loc = value - min_value_; - bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed); + bit_vector_.setBit(value - min_value_); } /** @@ -177,9 +164,7 @@ class BitVectorExactFilter : public LIPFilter { return is_anti_filter; } - const CppType loc = value - min_value_; - const bool is_bit_set = - (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0; + const bool is_bit_set = bit_vector_.getBit(value - min_value_); if (is_anti_filter) { return !is_bit_set; @@ -190,7 +175,7 @@ class BitVectorExactFilter : public LIPFilter { const CppType min_value_; const CppType max_value_; - alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_; + BarrieredReadWriteConcurrentBitVector bit_vector_; DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt index edd0d24..519d3e9 100644 --- a/utility/lip_filter/CMakeLists.txt +++ b/utility/lip_filter/CMakeLists.txt @@ -35,12 +35,12 @@ add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter quickstep_catalog_CatalogTypedefs quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorUtil quickstep_types_Type - quickstep_utility_lipfilter_LIPFilter - quickstep_utility_Macros) + quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter) target_link_libraries(quickstep_utility_lipfilter_LIPFilter quickstep_catalog_CatalogTypedefs quickstep_storage_StorageBlockInfo @@ -79,9 +79,9 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilter_proto target_link_libraries(quickstep_utility_lipfilter_SingleIdentityHashFilter quickstep_catalog_CatalogTypedefs quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorUtil quickstep_types_Type - quickstep_utility_lipfilter_LIPFilter - quickstep_utility_Macros) + quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_Macros + quickstep_utility_lipfilter_LIPFilter) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a28b1e4d/utility/lip_filter/SingleIdentityHashFilter.hpp ---------------------------------------------------------------------- diff --git a/utility/lip_filter/SingleIdentityHashFilter.hpp b/utility/lip_filter/SingleIdentityHashFilter.hpp index 5c0e8a2..d7e3475 100644 --- a/utility/lip_filter/SingleIdentityHashFilter.hpp +++ b/utility/lip_filter/SingleIdentityHashFilter.hpp @@ -20,18 +20,15 @@ #ifndef QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_ #define QUICKSTEP_UTILITY_LIP_FILTER_SINGLE_IDENTITY_HASH_FILTER_HPP_ -#include <atomic> #include <cstddef> -#include <cstdint> -#include <cstring> #include <vector> #include "catalog/CatalogTypedefs.hpp" #include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConstants.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" #include "types/Type.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" #include "utility/Macros.hpp" #include "utility/lip_filter/LIPFilter.hpp" @@ -65,11 +62,8 @@ class SingleIdentityHashFilter : public LIPFilter { explicit SingleIdentityHashFilter(const std::size_t filter_cardinality) : LIPFilter(LIPFilterType::kSingleIdentityHashFilter), filter_cardinality_(filter_cardinality), - bit_array_(GetByteSize(filter_cardinality)) { + bit_vector_(filter_cardinality) { DCHECK_GE(filter_cardinality, 1u); - std::memset(bit_array_.data(), - 0x0, - sizeof(std::atomic<std::uint8_t>) * GetByteSize(filter_cardinality)); } void insertValueAccessor(ValueAccessor *accessor, @@ -158,8 +152,9 @@ class SingleIdentityHashFilter : public LIPFilter { * @brief Inserts a given value into the hash filter. */ inline void insert(const void *key_begin) { - const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; - bit_array_[hash >> 3u].fetch_or(1u << (hash & 7u), std::memory_order_relaxed); + const CppType hash = + *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; + bit_vector_.setBit(hash); } /** @@ -168,12 +163,13 @@ class SingleIdentityHashFilter : public LIPFilter { * If false is returned, a value is certainly not present in the hash filter. */ inline bool contains(const void *key_begin) const { - const CppType hash = *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; - return (bit_array_[hash >> 3u].load(std::memory_order_relaxed) & (1u << (hash & 7u))); + const CppType hash = + *reinterpret_cast<const CppType *>(key_begin) % filter_cardinality_; + return bit_vector_.getBit(hash); } std::size_t filter_cardinality_; - alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_; + BarrieredReadWriteConcurrentBitVector bit_vector_; DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilter); };