New aggregation design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/bc81c5b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/bc81c5b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/bc81c5b3 Branch: refs/heads/agg-expr Commit: bc81c5b3fb8eb4c4bbce67be8247000e959df90e Parents: 4be8e91 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Wed Feb 22 13:58:08 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Thu Feb 23 22:03:55 2017 -0600 ---------------------------------------------------------------------- expressions/aggregation/AggFunc.hpp | 187 +++++++++ expressions/aggregation/CMakeLists.txt | 10 + query_optimizer/ExecutionGenerator.cpp | 5 +- storage/AggregationOperationState.cpp | 99 +---- storage/AggregationOperationState.hpp | 2 +- storage/CMakeLists.txt | 5 +- storage/CollisionFreeVectorTable.cpp | 478 +++++++++++++++------- storage/CollisionFreeVectorTable.hpp | 587 ++++++++-------------------- storage/PackedPayloadHashTable.cpp | 4 + storage/PackedPayloadHashTable.hpp | 60 +++ utility/BoolVector.hpp | 226 +++++++++++ utility/CMakeLists.txt | 5 + 12 files changed, 1007 insertions(+), 661 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/AggFunc.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp new file mode 100644 index 0000000..31f385e --- /dev/null +++ b/expressions/aggregation/AggFunc.hpp @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ +#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <type_traits> + +#include "expressions/aggregation/AggregationID.hpp" +#include "utility/Macros.hpp" +#include "types/IntType.hpp" +#include "types/LongType.hpp" +#include "types/FloatType.hpp" +#include "types/DoubleType.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class ColumnVector; +class StorageManager; +class Type; + +/** \addtogroup Expressions + * @{ + */ + +struct InvalidType {}; + +template <typename T, typename U> +struct is_different : std::true_type {}; + +template <typename T> +struct is_different<T, T> : std::false_type {}; + +class Sum { + public: + Sum() {} + + template <typename ArgType> + struct AggState { + typedef InvalidType T; + typedef InvalidType AtomicT; + typedef InvalidType ResultT; + }; + + template <typename ArgType> + struct HasAtomicImpl : + is_different<InvalidType, + typename AggState<ArgType>::AtomicT> {}; + + template <typename ArgType> + inline static void MergeArgAtomic(const typename ArgType::cpptype &value, + typename AggState<ArgType>::AtomicT *state) { + LOG(FATAL) << "Not implemented"; + } + + template <typename ArgType> + inline static void FinalizeAtomic(const typename AggState<ArgType>::AtomicT &state, + typename AggState<ArgType>::ResultT *result) { + LOG(FATAL) << "Not implemented"; + } + + template <typename ArgType> + inline static void MergeArgUnsafe(const typename ArgType::cpptype &value, + typename AggState<ArgType>::T *state) { + *state += value; + } + + template <typename ArgType> + inline static void FinalizeUnsafe(const typename AggState<ArgType>::T &state, + typename AggState<ArgType>::ResultT *result) { + *result = state; + } + + private: + DISALLOW_COPY_AND_ASSIGN(Sum); +}; + +//------------------------------------------------------------------------------ +// Implementation of Sum for IntType +template <> +struct Sum::AggState<IntType> { + typedef std::int64_t T; + typedef std::atomic<std::int64_t> AtomicT; + typedef std::int64_t ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<IntType>(const IntType::cpptype &value, + AggState<IntType>::AtomicT *state) { + state->fetch_add(value, std::memory_order_relaxed); +} + +template <> +inline void Sum::FinalizeAtomic<IntType>(const AggState<IntType>::AtomicT &state, + AggState<IntType>::ResultT *result) { + *result = state.load(std::memory_order_relaxed); +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for LongType +template <> +struct Sum::AggState<LongType> { + typedef std::int64_t T; + typedef std::atomic<std::int64_t> AtomicT; + typedef std::int64_t ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<LongType>(const LongType::cpptype &value, + AggState<LongType>::AtomicT *state) { + state->fetch_add(value, std::memory_order_relaxed); +} + +template <> +inline void Sum::FinalizeAtomic<LongType>(const AggState<LongType>::AtomicT &state, + AggState<LongType>::ResultT *result) { + *result = state.load(std::memory_order_relaxed); +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for FloatType +template <> +struct Sum::AggState<FloatType> { + typedef double T; + typedef std::atomic<double> AtomicT; + typedef double ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<FloatType>(const FloatType::cpptype &value, + AggState<FloatType>::AtomicT *state) { + AggState<FloatType>::T state_val = state->load(std::memory_order_relaxed); + while (!state->compare_exchange_weak(state_val, state_val + value)) {} +} + +template <> +inline void Sum::FinalizeAtomic<FloatType>(const AggState<FloatType>::AtomicT &state, + AggState<FloatType>::ResultT *result) { + *result = state.load(std::memory_order_relaxed); +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for DoubleType +template <> +struct Sum::AggState<DoubleType> { + typedef double T; + typedef std::atomic<double> AtomicT; + typedef double ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<DoubleType>(const DoubleType::cpptype &value, + AggState<DoubleType>::AtomicT *state) { + AggState<DoubleType>::T state_val = state->load(std::memory_order_relaxed); + while (!state->compare_exchange_weak(state_val, state_val + value)) {} +} + +template <> +inline void Sum::FinalizeAtomic<DoubleType>(const AggState<DoubleType>::AtomicT &state, + AggState<DoubleType>::ResultT *result) { + *result = state.load(std::memory_order_relaxed); +} + +} // namespace quickstep + +#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 4220a8d..c0ebad7 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(expressions_aggregation_AggregateFunction_proto_srcs AggregateFunction.proto) # Declare micro-libs: +add_library(quickstep_expressions_aggregation_AggFunc ../../empty_src.cpp AggFunc.hpp) add_library(quickstep_expressions_aggregation_AggregateFunction AggregateFunction.cpp AggregateFunction.hpp) @@ -69,6 +70,14 @@ add_library(quickstep_expressions_aggregation_AggregationID AggregationID.hpp) # Link dependencies: +target_link_libraries(quickstep_expressions_aggregation_AggFunc + glog + quickstep_expressions_aggregation_AggregationID + quickstep_types_DoubleType + quickstep_types_FloatType + quickstep_types_IntType + quickstep_types_LongType + quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregateFunction glog quickstep_expressions_aggregation_AggregateFunction_proto @@ -236,6 +245,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum # Submodule all-in-one library: add_library(quickstep_expressions_aggregation ../../empty_src.cpp) target_link_libraries(quickstep_expressions_aggregation + quickstep_expressions_aggregation_AggFunc quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto quickstep_expressions_aggregation_AggregateFunctionAvg http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 70b69e0..19fc322 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -163,6 +163,8 @@ static const volatile bool aggregate_hashtable_type_dummy DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); +DEFINE_bool(use_collision_free_agg, true, ""); + namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; namespace S = ::quickstep::serialization; @@ -1508,7 +1510,8 @@ void ExecutionGenerator::convertAggregate( cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); std::size_t max_num_groups; - if (cost_model_for_aggregation_ + if (FLAGS_use_collision_free_agg && + cost_model_for_aggregation_ ->canUseCollisionFreeAggregation(physical_plan, estimated_num_groups, &max_num_groups)) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 0f39b41..00bb433 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -67,7 +67,7 @@ DEFINE_int32(num_aggregation_partitions, 41, "The number of partitions used for performing the aggregation"); DEFINE_int32(partition_aggregation_num_groups_threshold, - 500000, + 100, "The threshold used for deciding whether the aggregation is done " "in a partitioned way or not"); @@ -208,13 +208,13 @@ AggregationOperationState::AggregationOperationState( group_by_handles, storage_manager)); } else if (is_aggregate_partitioned_) { - partitioned_group_by_hashtable_pool_.reset( - new PartitionedHashTablePool(estimated_num_entries, - FLAGS_num_aggregation_partitions, - hash_table_impl_type, - group_by_types_, - group_by_handles, - storage_manager)); + partitioned_hashtable_.reset( + AggregationStateHashTableFactory::CreateResizable( + hash_table_impl_type, + group_by_types_, + estimated_num_entries, + group_by_handles, + storage_manager_)); } else { group_by_hashtable_pool_.reset( new HashTablePool(estimated_num_entries, @@ -406,7 +406,8 @@ std::size_t AggregationOperationState::getNumFinalizationPartitions() const { return static_cast<CollisionFreeVectorTable *>( collision_free_hashtable_.get())->getNumFinalizationPartitions(); } else if (is_aggregate_partitioned_) { - return partitioned_group_by_hashtable_pool_->getNumPartitions(); + return static_cast<PackedPayloadHashTable *>( + partitioned_hashtable_.get())->getNumFinalizationPartitions(); } else { return 1u; } @@ -549,62 +550,11 @@ void AggregationOperationState::aggregateBlockHashTableImplCollisionFree( void AggregationOperationState::aggregateBlockHashTableImplPartitioned( const ValueAccessorMultiplexer &accessor_mux) { - DCHECK(partitioned_group_by_hashtable_pool_ != nullptr); - - std::vector<attribute_id> group_by_key_ids; - for (const MultiSourceAttributeId &key_id : group_by_key_ids_) { - DCHECK(key_id.source == ValueAccessorSource::kBase); - group_by_key_ids.emplace_back(key_id.attr_id); - } + DCHECK(partitioned_hashtable_ != nullptr); - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor_mux.getBaseAccessor(), - [&](auto *accessor) -> void { // NOLINT(build/c++11) - // TODO(jianqiao): handle the situation when keys in non_trivial_results - const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions(); - - // Compute the partitions for the tuple formed by group by values. - std::vector<std::unique_ptr<TupleIdSequence>> partition_membership; - partition_membership.resize(num_partitions); - - // Create a tuple-id sequence for each partition. - for (std::size_t partition = 0; partition < num_partitions; ++partition) { - partition_membership[partition].reset( - new TupleIdSequence(accessor->getEndPosition())); - } - - // Iterate over ValueAccessor for each tuple, - // set a bit in the appropriate TupleIdSequence. - while (accessor->next()) { - // We need a unique_ptr because getTupleWithAttributes() uses "new". - std::unique_ptr<Tuple> curr_tuple( - accessor->getTupleWithAttributes(group_by_key_ids)); - const std::size_t curr_tuple_partition_id = - curr_tuple->getTupleHash() % num_partitions; - partition_membership[curr_tuple_partition_id]->set( - accessor->getCurrentPosition(), true); - } - - // Aggregate each partition. - for (std::size_t partition = 0; partition < num_partitions; ++partition) { - std::unique_ptr<ValueAccessor> base_adapter( - accessor->createSharedTupleIdSequenceAdapter( - *partition_membership[partition])); - - std::unique_ptr<ValueAccessor> derived_adapter; - if (accessor_mux.getDerivedAccessor() != nullptr) { - derived_adapter.reset( - accessor_mux.getDerivedAccessor()->createSharedTupleIdSequenceAdapterVirtual( - *partition_membership[partition])); - } - - ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get()); - partitioned_group_by_hashtable_pool_->getHashTable(partition) - ->upsertValueAccessorCompositeKey(argument_ids_, - group_by_key_ids_, - local_mux); - } - }); + partitioned_hashtable_->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); } void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate( @@ -712,20 +662,18 @@ void AggregationOperationState::finalizeHashTableImplPartitioned( const std::size_t partition_id, InsertDestination *output_destination) { PackedPayloadHashTable *hash_table = - static_cast<PackedPayloadHashTable *>( - partitioned_group_by_hashtable_pool_->getHashTable(partition_id)); + static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get()); // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector<std::vector<TypedValue>> group_by_keys; if (handles_.empty()) { - const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key, - const std::uint8_t *dumb_placeholder) -> void { + hash_table->forEachCompositeKeyInPartition( + partition_id, + [&](std::vector<TypedValue> &group_by_key) -> void { group_by_keys.emplace_back(std::move(group_by_key)); - }; - - hash_table->forEachCompositeKey(&keys_retriever); + }); } // Collect per-aggregate finalized values. @@ -737,15 +685,8 @@ void AggregationOperationState::finalizeHashTableImplPartitioned( final_values.emplace_back(agg_result_col); } } - hash_table->destroyPayload(); +// hash_table->destroyPayload(); - // Reorganize 'group_by_keys' in column-major order so that we can make a - // ColumnVectorsValueAccessor to bulk-insert results. - // - // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal - // if there is only one aggregate. The need to do this should hopefully go - // away when we work out storing composite structures for multiple aggregates - // in a single HashTable. std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; std::size_t group_by_element_idx = 0; for (const Type *group_by_type : group_by_types_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index c8930ee..a75f243 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -296,7 +296,7 @@ class AggregationOperationState { // A vector of group by hash table pools. std::unique_ptr<HashTablePool> group_by_hashtable_pool_; - std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_; + std::unique_ptr<AggregationStateHashTableBase> partitioned_hashtable_; std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 293be17..fcc069b 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -436,7 +436,9 @@ if(QUICKSTEP_HAVE_BITWEAVING) endif() # CMAKE_VALIDATE_IGNORE_END target_link_libraries(quickstep_storage_CollisionFreeVectorTable + ${GFLAGS_LIB_NAME} quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggFunc quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase @@ -447,11 +449,12 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorMultiplexer quickstep_storage_ValueAccessorUtil + quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeID quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor - quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_BoolVector quickstep_utility_Macros) target_link_libraries(quickstep_storage_ColumnStoreUtil quickstep_catalog_CatalogAttribute http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp index d836014..c92f0ab 100644 --- a/storage/CollisionFreeVectorTable.cpp +++ b/storage/CollisionFreeVectorTable.cpp @@ -24,6 +24,7 @@ #include <cstdint> #include <cstdlib> #include <memory> +#include <type_traits> #include <vector> #include "expressions/aggregation/AggregationHandle.hpp" @@ -33,13 +34,175 @@ #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorMultiplexer.hpp" #include "storage/ValueAccessorUtil.hpp" +#include "threading/SpinMutex.hpp" +#include "types/TypeID.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" -#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" +#include "utility/BoolVector.hpp" #include "glog/logging.h" namespace quickstep { +DEFINE_uint64(vt_threadprivate_threshold, 1000000L, ""); +DEFINE_bool(use_latch, false, ""); + +namespace { + +template <typename T> +using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>; + +template <typename FunctorT> +inline auto InvokeOnKeyType(const Type &type, + const FunctorT &functor) { + switch (type.getTypeID()) { + case TypeID::kInt: + return functor(static_cast<const IntType&>(type)); + case TypeID::kLong: + return functor(static_cast<const LongType&>(type)); + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeOnType(const Type &type, + const FunctorT &functor) { + switch (type.getTypeID()) { + case TypeID::kInt: + return functor(static_cast<const IntType&>(type)); + case TypeID::kLong: + return functor(static_cast<const LongType&>(type)); + case TypeID::kFloat: + return functor(static_cast<const FloatType&>(type)); + case TypeID::kDouble: + return functor(static_cast<const DoubleType&>(type)); + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeOnBool(const bool &val, + const FunctorT &functor) { + if (val) { + return functor(std::true_type()); + } else { + return functor(std::false_type()); + } +} + +template <typename FunctorT> +inline auto InvokeOnBools(const bool &val1, + const bool &val2, + const FunctorT &functor) { + if (val1) { + if (val2) { + return functor(std::true_type(), std::true_type()); + } else { + return functor(std::true_type(), std::false_type()); + } + } else { + if (val2) { + return functor(std::false_type(), std::true_type()); + } else { + return functor(std::false_type(), std::false_type()); + } + } +} + +template <typename FunctorT> +inline auto InvokeOnAggFunc(const AggregationID &agg_id, + const FunctorT &functor) { + switch (agg_id) { + case AggregationID::kSum: { + return functor(Sum()); + } + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeIf(const std::true_type &val, + const FunctorT &functor) { + return functor(); +} + +template <typename FunctorT> +inline void InvokeIf(const std::false_type &val, + const FunctorT &functor) { +} + +//template <typename FunctorT> +//inline void InvokeOnAggFuncIfApplicableToArgType( +// const AggregationID &agg_id, +// const Type &arg_type, +// const FunctorT &functor) { +// InvokeOnAggFunc( +// agg_id, +// [&](const auto &agg_func) -> void { +// InvokeOnType( +// arg_type, +// [&](const auto &arg_type) -> void { +// using AggFuncT = std::remove_reference_t<decltype(agg_func)>; +// using ArgT = remove_const_reference_t<decltype(arg_type)>; +// +// InvokeIf( +// typename AggFuncT::template HasAtomicImpl<ArgT>(), +// [&]() -> void { +// functor(agg_func, arg_type); +// }); +// }); +// }); +//} + +template <typename FunctorT> +inline void InvokeOnAggFuncWithArgType( + const AggregationID &agg_id, + const Type &arg_type, + const FunctorT &functor) { + InvokeOnAggFunc( + agg_id, + [&](const auto &agg_func) -> void { + InvokeOnType( + arg_type, + [&](const auto &arg_type) -> void { + functor(agg_func, arg_type); + }); + }); +} + +template <typename FunctorT> +inline auto InvokeOnTwoAccessors( + const ValueAccessorMultiplexer &accessor_mux, + const ValueAccessorSource &first_source, + const ValueAccessorSource &second_source, + const FunctorT &functor) { + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + ColumnVectorsValueAccessor *derived_accessor = + static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); + + InvokeOnAnyValueAccessor( + base_accessor, + [&](auto *accessor) { + if (first_source == ValueAccessorSource::kBase) { + if (second_source == ValueAccessorSource::kBase) { + return functor(std::false_type(), accessor, accessor); + } else { + return functor(std::true_type(), accessor, derived_accessor); + } + } else { + if (second_source == ValueAccessorSource::kBase) { + return functor(std::true_type(), derived_accessor, accessor); + } else { + return functor(std::false_type(), derived_accessor, derived_accessor); + } + } + }); +} + +} // namespace + CollisionFreeVectorTable::CollisionFreeVectorTable( const Type *key_type, const std::size_t num_entries, @@ -49,46 +212,45 @@ CollisionFreeVectorTable::CollisionFreeVectorTable( num_entries_(num_entries), num_handles_(handles.size()), handles_(handles), + use_thread_private_existence_map_(num_entries_ < FLAGS_vt_threadprivate_threshold), num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)), storage_manager_(storage_manager) { DCHECK_GT(num_entries, 0u); std::size_t required_memory = 0; const std::size_t existence_map_offset = 0; + std::size_t mutex_vec_offset = 0; std::vector<std::size_t> state_offsets; - required_memory += CacheLineAlignedBytes( - BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); + if (!use_thread_private_existence_map_) { + required_memory += CacheLineAlignedBytes( + BarrieredReadWriteConcurrentBoolVector::BytesNeeded(num_entries)); + } + + if (FLAGS_use_latch) { + mutex_vec_offset = required_memory; + required_memory += CacheLineAlignedBytes(num_entries * sizeof(SpinMutex)); + } for (std::size_t i = 0; i < num_handles_; ++i) { const AggregationHandle *handle = handles_[i]; const std::vector<const Type *> argument_types = handle->getArgumentTypes(); + DCHECK_EQ(1u, argument_types.size()); std::size_t state_size = 0; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - state_size = sizeof(std::atomic<std::size_t>); - break; - } - case AggregationID::kSum: { - DCHECK_EQ(1u, argument_types.size()); - switch (argument_types.front()->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - state_size = sizeof(std::atomic<std::int64_t>); - break; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - state_size = sizeof(std::atomic<double>); - break; - default: - LOG(FATAL) << "Not implemented"; - } - break; + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *argument_types.front(), + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + if (FLAGS_use_latch) { + state_size = sizeof(typename AggFuncT::template AggState<ArgT>::T); + } else { + state_size = sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT); } - default: - LOG(FATAL) << "Not implemented"; - } + }); state_offsets.emplace_back(required_memory); required_memory += CacheLineAlignedBytes(state_size * num_entries); @@ -101,10 +263,21 @@ CollisionFreeVectorTable::CollisionFreeVectorTable( blob_ = storage_manager_->getBlobMutable(blob_id); void *memory_start = blob_->getMemoryMutable(); - existence_map_.reset(new BarrieredReadWriteConcurrentBitVector( - reinterpret_cast<char *>(memory_start) + existence_map_offset, - num_entries, - false /* initialize */)); + if (use_thread_private_existence_map_) { + thread_private_existence_map_pool_.reset(new BoolVectorPool(num_entries)); + } else { + concurrent_existence_map_.reset(new BarrieredReadWriteConcurrentBoolVector( + reinterpret_cast<char *>(memory_start) + existence_map_offset, + num_entries, + false /* initialize */)); + } + + if (FLAGS_use_latch) { + mutex_vec_ = reinterpret_cast<SpinMutex *>( + reinterpret_cast<char *>(memory_start) + mutex_vec_offset); + } else { + mutex_vec_ = nullptr; + } for (std::size_t i = 0; i < num_handles_; ++i) { // Columnwise layout. @@ -132,113 +305,103 @@ bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey( DCHECK_EQ(1u, key_ids.size()); if (handles_.empty()) { - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor_mux.getValueAccessorBySource(key_ids.front().source), - [&key_ids, this](auto *accessor) -> void { // NOLINT(build/c++11) - this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(), - key_type_, - key_ids.front().attr_id, - accessor); - }); - return true; + LOG(FATAL) << "Not implemented"; } - DCHECK(accessor_mux.getDerivedAccessor() == nullptr || - accessor_mux.getDerivedAccessor()->getImplementationType() - == ValueAccessor::Implementation::kColumnVectors); + const ValueAccessorSource key_source = key_ids.front().source; + const attribute_id key_id = key_ids.front().attr_id; + const bool is_key_nullable = key_type_->isNullable(); - ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); - ColumnVectorsValueAccessor *derived_accesor = - static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); + for (std::size_t i = 0; i < num_handles_; ++i) { + DCHECK_LE(argument_ids[i].size(), 1u); - // Dispatch to specialized implementations to achieve maximum performance. - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - base_accessor, - [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void { // NOLINT(build/c++11) - const ValueAccessorSource key_source = key_ids.front().source; - const attribute_id key_id = key_ids.front().attr_id; - const bool is_key_nullable = key_type_->isNullable(); - - for (std::size_t i = 0; i < num_handles_; ++i) { - DCHECK_LE(argument_ids[i].size(), 1u); - - const AggregationHandle *handle = handles_[i]; - const auto &argument_types = handle->getArgumentTypes(); - const auto &argument_ids_i = argument_ids[i]; - - ValueAccessorSource argument_source; - attribute_id argument_id; - const Type *argument_type; - bool is_argument_nullable; - - if (argument_ids_i.empty()) { - argument_source = ValueAccessorSource::kInvalid; - argument_id = kInvalidAttributeID; - - DCHECK(argument_types.empty()); - argument_type = nullptr; - is_argument_nullable = false; - } else { - DCHECK_EQ(1u, argument_ids_i.size()); - argument_source = argument_ids_i.front().source; - argument_id = argument_ids_i.front().attr_id; + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + const auto &argument_ids_i = argument_ids[i]; + + ValueAccessorSource argument_source; + attribute_id argument_id; + const Type *argument_type; + bool is_argument_nullable; + + if (argument_ids_i.empty()) { +// argument_source = ValueAccessorSource::kInvalid; +// argument_id = kInvalidAttributeID; +// +// DCHECK(argument_types.empty()); +// argument_type = nullptr; +// is_argument_nullable = false; + LOG(FATAL) << "Not supported"; + } else { + DCHECK_EQ(1u, argument_ids_i.size()); + argument_source = argument_ids_i.front().source; + argument_id = argument_ids_i.front().attr_id; + + DCHECK_EQ(1u, argument_types.size()); + argument_type = argument_types.front(); + is_argument_nullable = argument_type->isNullable(); + } - DCHECK_EQ(1u, argument_types.size()); - argument_type = argument_types.front(); - is_argument_nullable = argument_type->isNullable(); - } + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *argument_types.front(), + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + InvokeOnKeyType( + *key_type_, + [&](const auto &key_type) -> void { + using KeyT = remove_const_reference_t<decltype(key_type)>; + + InvokeOnBools( + is_key_nullable, + is_argument_nullable, + [&](const auto &is_key_nullable, + const auto &is_argument_nullable) -> void { + using KeyNullableT = + remove_const_reference_t<decltype(is_key_nullable)>; + using ArgNullableT = + remove_const_reference_t<decltype(is_argument_nullable)>; + + InvokeOnTwoAccessors( + accessor_mux, + key_source, + argument_source, + [&](const auto &use_two_accessors, + auto *key_accessor, + auto *argument_accessor) { + using UseTwoAccessorsT = + remove_const_reference_t<decltype(use_two_accessors)>; + + invokeOnExistenceMap( + [&](auto *existence_map) -> void { + if (FLAGS_use_latch) { + upsertValueAccessorInternalUnaryLatch< + AggFuncT, KeyT, ArgT, + KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id, + argument_id, + vec_tables_[i], + existence_map, + key_accessor, + argument_accessor); + } else { + upsertValueAccessorInternalUnaryAtomic< + AggFuncT, KeyT, ArgT, + KeyNullableT::value, ArgNullableT::value, UseTwoAccessorsT::value>( key_id, + argument_id, + vec_tables_[i], + existence_map, + key_accessor, + argument_accessor); + } + }); + }); + }); + }); + }); + } - if (key_source == ValueAccessorSource::kBase) { - if (argument_source == ValueAccessorSource::kBase) { - this->upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - accessor); - } else { - this->upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - derived_accesor); - } - } else { - if (argument_source == ValueAccessorSource::kBase) { - this->upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - accessor); - } else { - this->upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - derived_accesor); - } - } - } - }); return true; } @@ -249,16 +412,17 @@ void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id, const std::size_t end_position = calculatePartitionEndPosition(partition_id); - switch (key_type_->getTypeID()) { - case TypeID::kInt: - finalizeKeyInternal<int>(start_position, end_position, output_cv); - return; - case TypeID::kLong: - finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv); - return; - default: - LOG(FATAL) << "Not supported"; - } + InvokeOnKeyType( + *key_type_, + [&](const auto &key_type) { + using KeyT = remove_const_reference_t<decltype(key_type)>; + + invokeOnExistenceMapFinal( + [&](const auto *existence_map) -> void { + finalizeKeyInternal<typename KeyT::cpptype>( + start_position, end_position, existence_map, output_cv); + }); + }); } void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, @@ -274,12 +438,32 @@ void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, const Type *argument_type = argument_types.empty() ? nullptr : argument_types.front(); - finalizeStateDispatchHelper(handle->getAggregationID(), - argument_type, - vec_tables_[handle_id], - start_position, - end_position, - output_cv); + DCHECK(argument_type != nullptr); + + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *argument_type, + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + invokeOnExistenceMapFinal( + [&](const auto *existence_map) -> void { + if (FLAGS_use_latch) { + finalizeStateInternalLatch<AggFuncT, ArgT>(start_position, + end_position, + vec_tables_[handle_id], + existence_map, + output_cv); + } else { + finalizeStateInternalAtomic<AggFuncT, ArgT>(start_position, + end_position, + vec_tables_[handle_id], + existence_map, + output_cv); + } + }); + }); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp index 772d47d..79020fb 100644 --- a/storage/CollisionFreeVectorTable.hpp +++ b/storage/CollisionFreeVectorTable.hpp @@ -29,22 +29,27 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggFunc.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/HashTableBase.hpp" #include "storage/StorageBlob.hpp" #include "storage/StorageConstants.hpp" #include "storage/ValueAccessorMultiplexer.hpp" +#include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeID.hpp" #include "types/containers/ColumnVector.hpp" -#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" +#include "utility/BoolVector.hpp" #include "utility/Macros.hpp" +#include "gflags/gflags.h" + #include "glog/logging.h" namespace quickstep { class AggregationHandle; +class BarrieredReadWriteConcurrentBitVector; class StorageManager; /** \addtogroup Storage @@ -101,7 +106,17 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { calculatePartitionStartPosition(partition_id); const std::size_t end_position = calculatePartitionEndPosition(partition_id); - return existence_map_->onesCountInRange(start_position, end_position); + + if (use_thread_private_existence_map_) { + auto &bool_vectors = thread_private_existence_map_pool_->getAll(); + auto &target_bv = bool_vectors.front(); + for (std::size_t i = 1; i < bool_vectors.size(); ++i) { + target_bv->unionWith(*bool_vectors[i], start_position, end_position); + } + return target_bv->onesCountInRange(start_position, end_position); + } else { + return concurrent_existence_map_->onesCountInRange(start_position, end_position); + } } /** @@ -110,7 +125,7 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { * @return The existence map for this vector table. */ inline BarrieredReadWriteConcurrentBitVector* getExistenceMap() const { - return existence_map_.get(); + return nullptr; } /** @@ -214,115 +229,67 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { num_entries_); } - template <bool use_two_accessors, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args); + template <typename FunctorT> + inline void invokeOnExistenceMap(const FunctorT &functor) { + if (use_thread_private_existence_map_) { + BoolVector *existence_map = thread_private_existence_map_pool_->checkOut(); + functor(existence_map); + thread_private_existence_map_pool_->checkIn(existence_map); + } else { + functor(concurrent_existence_map_.get()); + } + } - template <bool ...bool_values, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *key_type, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountHelper( - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorSumHelper( - const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename ...ArgTypes> - inline void upsertValueAccessorKeyOnlyHelper( - const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorKeyOnly( - const attribute_id key_attr_id, - KeyValueAccessorT *key_accessor); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorCountNullary( - const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor); - - template <bool use_two_accessors, bool is_key_nullable, typename KeyT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountUnary( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorIntegerSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorGenericSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename KeyT> + template <typename FunctorT> + inline void invokeOnExistenceMapFinal(const FunctorT &functor) const { + if (use_thread_private_existence_map_) { + const BoolVector *existence_map = + thread_private_existence_map_pool_->getAll().front().get(); + functor(existence_map); + } else { + functor(concurrent_existence_map_.get()); + } + } + + template <typename AggFuncT, typename KeyT, typename ArgT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, + typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT> + inline void upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyAccessorT *key_accessor, + ArgAccessorT *argument_accessor); + + template <typename AggFuncT, typename KeyT, typename ArgT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, + typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT> + inline void upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyAccessorT *key_accessor, + ArgAccessorT *argument_accessor); + + template <typename KeyT, typename BoolVectorT> inline void finalizeKeyInternal(const std::size_t start_position, const std::size_t end_position, + BoolVectorT *existence_map, NativeColumnVector *output_cv) const; - template <typename ...ArgTypes> - inline void finalizeStateDispatchHelper(const AggregationID agg_id, - const Type *argument_type, + template <typename AggFuncT, typename ArgT, typename BoolVectorT> + inline void finalizeStateInternalAtomic(const std::size_t start_position, + const std::size_t end_position, const void *vec_table, - ArgTypes &&...args) const; - - template <typename ...ArgTypes> - inline void finalizeStateSumHelper(const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const; - - inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const; + BoolVectorT *existence_map, + NativeColumnVector *output_cv) const; - template <typename ResultT, typename StateT> - inline void finalizeStateSum(const std::atomic<StateT> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const; + template <typename AggFuncT, typename ArgT, typename BoolVectorT> + inline void finalizeStateInternalLatch(const std::size_t start_position, + const std::size_t end_position, + const void *vec_table, + BoolVectorT *existence_map, + NativeColumnVector *output_cv) const; const Type *key_type_; const std::size_t num_entries_; @@ -330,8 +297,12 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { const std::size_t num_handles_; const std::vector<AggregationHandle *> handles_; - std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_; + const bool use_thread_private_existence_map_; + std::unique_ptr<BarrieredReadWriteConcurrentBoolVector> concurrent_existence_map_; + std::unique_ptr<BoolVectorPool> thread_private_existence_map_pool_; + std::vector<void *> vec_tables_; + SpinMutex *mutex_vec_; const std::size_t num_finalize_partitions_; @@ -347,392 +318,144 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { // ---------------------------------------------------------------------------- // Implementations of template methods follow. -template <bool use_two_accessors, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args) { - if (is_key_nullable) { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, true, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, true, false>( - std::forward<ArgTypes>(args)...); - } - } else { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, false, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, false, false>( - std::forward<ArgTypes>(args)...); - } - } -} - -template <bool ...bool_values, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorDispatchHelper<bool_values..., int>( - std::forward<ArgTypes>(args)...); - return; - case TypeID::kLong: - upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>( - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args) { - switch (agg_id) { - case AggregationID::kCount: - upsertValueAccessorCountHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - upsertValueAccessorSumHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - argument_type, std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...); - } - return; - } - case TypeID::kLong: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...); - } - return; - } - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - existence_map_->setBit(*key); - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorCountHelper(const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0); - - if (is_argument_nullable && argument_id != kInvalidAttributeID) { - upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - } else { - upsertValueAccessorCountNullary<is_key_nullable, KeyT>( - key_attr_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor); - return; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> +template <typename AggFuncT, typename KeyT, typename ArgT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, + typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT> inline void CollisionFreeVectorTable - ::upsertValueAccessorSumHelper(const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0); - DCHECK_GE(argument_id, 0); - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kLong: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kFloat: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kDouble: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - default: - LOG(FATAL) << "Not supported"; - } -} + ::upsertValueAccessorInternalUnaryAtomic(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyAccessorT *key_accessor, + ArgAccessorT *argument_accessor) { + auto *states = static_cast< + typename AggFuncT::template AggState<ArgT>::AtomicT *>(vec_table); -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorCountNullary(const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - existence_map_->setBit(loc); - } -} - -template <bool use_two_accessors, bool is_key_nullable, typename KeyT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorCountUnary(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { key_accessor->beginIteration(); if (use_two_accessors) { argument_accessor->beginIteration(); } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - if (argument_accessor->getUntypedValue(argument_id) == nullptr) { - continue; - } - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - } -} -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } while (key_accessor->next()) { if (use_two_accessors) { argument_accessor->next(); } - const KeyT *key = static_cast<const KeyT *>( + + const auto *key = static_cast<const typename KeyT::cpptype *>( key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); if (is_key_nullable && key == nullptr) { continue; } const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( + existence_map->set(loc); + + const auto *argument = static_cast<const typename ArgT::cpptype *>( argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); if (is_argument_nullable && argument == nullptr) { continue; } - vec_table[loc].fetch_add(*argument, std::memory_order_relaxed); + + AggFuncT::template MergeArgAtomic<ArgT>(*argument, states + loc); } } -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> +template <typename AggFuncT, typename KeyT, typename ArgT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, + typename KeyAccessorT, typename ArgAccessorT, typename BoolVectorT> inline void CollisionFreeVectorTable - ::upsertValueAccessorGenericSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { + ::upsertValueAccessorInternalUnaryLatch(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyAccessorT *key_accessor, + ArgAccessorT *argument_accessor) { + auto *states = static_cast< + typename AggFuncT::template AggState<ArgT>::T *>(vec_table); + key_accessor->beginIteration(); if (use_two_accessors) { argument_accessor->beginIteration(); } + while (key_accessor->next()) { if (use_two_accessors) { argument_accessor->next(); } - const KeyT *key = static_cast<const KeyT *>( + + const auto *key = static_cast<const typename KeyT::cpptype *>( key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); if (is_key_nullable && key == nullptr) { continue; } const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( + existence_map->set(loc); + + const auto *argument = static_cast<const typename ArgT::cpptype *>( argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); if (is_argument_nullable && argument == nullptr) { continue; } - const ArgumentT arg_val = *argument; - std::atomic<StateT> &state = vec_table[loc]; - StateT state_val = state.load(std::memory_order_relaxed); - while (!state.compare_exchange_weak(state_val, state_val + arg_val)) {} + + SpinMutexLock lock(mutex_vec_[loc]); + AggFuncT::template MergeArgUnsafe<ArgT>(*argument, states + loc); } } -template <typename KeyT> +template <typename KeyT, typename BoolVectorT> inline void CollisionFreeVectorTable ::finalizeKeyInternal(const std::size_t start_position, const std::size_t end_position, + BoolVectorT *existence_map, NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + for (std::size_t loc = start_position; loc < end_position; ++loc) { + if (existence_map->get(loc)) { + *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + } } } -template <typename ...ArgTypes> +template <typename AggFuncT, typename ArgT, typename BoolVectorT> inline void CollisionFreeVectorTable - ::finalizeStateDispatchHelper(const AggregationID agg_id, - const Type *argument_type, + ::finalizeStateInternalAtomic(const std::size_t start_position, + const std::size_t end_position, const void *vec_table, - ArgTypes &&...args) const { - switch (agg_id) { - case AggregationID::kCount: - finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - finalizeStateSumHelper(argument_type, - vec_table, - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::finalizeStateSumHelper(const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const { - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - finalizeStateSum<std::int64_t>( - static_cast<const std::atomic<std::int64_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - finalizeStateSum<double>( - static_cast<const std::atomic<double> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; + BoolVectorT *existence_map, + NativeColumnVector *output_cv) const { + using StateT = typename AggFuncT::template AggState<ArgT>; + using ResultT = typename StateT::ResultT; + + const auto *states = static_cast<const typename StateT::AtomicT *>(vec_table); + + for (std::size_t loc = start_position; loc < end_position; ++loc) { + if (existence_map->get(loc)) { + AggFuncT::template FinalizeAtomic<ArgT>( + states[loc], + static_cast<ResultT *>(output_cv->getPtrForDirectWrite())); + } } } +template <typename AggFuncT, typename ArgT, typename BoolVectorT> inline void CollisionFreeVectorTable - ::finalizeStateCount(const std::atomic<std::size_t> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); + ::finalizeStateInternalLatch(const std::size_t start_position, + const std::size_t end_position, + const void *vec_table, + BoolVectorT *existence_map, + NativeColumnVector *output_cv) const { + using StateT = typename AggFuncT::template AggState<ArgT>; + using ResultT = typename StateT::ResultT; + + const auto *states = static_cast<const typename StateT::T *>(vec_table); + + for (std::size_t loc = start_position; loc < end_position; ++loc) { + if (existence_map->get(loc)) { + AggFuncT::template FinalizeUnsafe<ArgT>( + states[loc], + static_cast<ResultT *>(output_cv->getPtrForDirectWrite())); + } } } -template <typename ResultT, typename StateT> -inline void CollisionFreeVectorTable - ::finalizeStateSum(const std::atomic<StateT> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } -} } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp index bf5eaee..bd7e960 100644 --- a/storage/PackedPayloadHashTable.cpp +++ b/storage/PackedPayloadHashTable.cpp @@ -256,6 +256,10 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( void PackedPayloadHashTable::resize(const std::size_t extra_buckets, const std::size_t extra_variable_storage, const std::size_t retry_num) { + LOG(FATAL) << "Resize " << numEntries() << " + " + << extra_buckets << " + " << extra_variable_storage + << " -- " << header_->num_buckets; + // A retry should never be necessary with this implementation of HashTable. // Separate chaining ensures that any resized hash table with more buckets // than the original table will be able to hold more entries than the http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/storage/PackedPayloadHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp index f87a1de..9ba5500 100644 --- a/storage/PackedPayloadHashTable.hpp +++ b/storage/PackedPayloadHashTable.hpp @@ -95,6 +95,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { void destroyPayload() override; + inline std::size_t getNumFinalizationPartitions() const { + return CalculateNumFinalizationPartitions(numEntries()); + } + /** * @brief Use aggregation handles to update (multiple) aggregation states in * this hash table, with group-by keys and arguments drawn from the @@ -287,6 +291,11 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { template <typename FunctorT> inline std::size_t forEachCompositeKey(FunctorT *functor) const; + template <typename FunctorT> + inline void forEachCompositeKeyInPartition( + const std::size_t partition_id, + const FunctorT &functor) const; + /** * @brief Apply a functor to each (key, aggregation state) pair in this hash * table, where the aggregation state is retrieved from the value @@ -328,6 +337,25 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { return total; } + inline std::size_t calculatePartitionLength() const { + const std::size_t num_finalize_partitions = getNumFinalizationPartitions(); + const std::size_t partition_length = + (numEntries() + num_finalize_partitions - 1) / num_finalize_partitions; + DCHECK_GE(partition_length, 0u); + return partition_length; + } + + inline std::size_t calculatePartitionStartPosition( + const std::size_t partition_id) const { + return calculatePartitionLength() * partition_id; + } + + inline std::size_t calculatePartitionEndPosition( + const std::size_t partition_id) const { + return std::min(calculatePartitionLength() * (partition_id + 1), + numEntries()); + } + inline bool getNextEntry(TypedValue *key, const std::uint8_t **value, std::size_t *entry_num) const; @@ -438,6 +466,15 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { kBucketAlignment; } + inline static std::size_t CalculateNumFinalizationPartitions( + const std::size_t num_entries) { + // Set finalization segment size as 4096 entries. + constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L; + + // At least 1 partition, at most 80 partitions. + return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL)); + } + // Attempt to find an empty bucket to insert 'hash_code' into, starting after // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an @@ -975,6 +1012,29 @@ inline std::size_t PackedPayloadHashTable::forEachCompositeKey( } template <typename FunctorT> +inline void PackedPayloadHashTable::forEachCompositeKeyInPartition( + const std::size_t partition_id, + const FunctorT &functor) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + std::vector<TypedValue> key; + for (std::size_t i = start_position; i < end_position; ++i) { + const char *bucket = + static_cast<const char *>(buckets_) + i * bucket_size_; + for (std::vector<const Type *>::size_type key_idx = 0; + key_idx < this->key_types_.size(); + ++key_idx) { + key.emplace_back(key_manager_.getKeyComponentTyped(bucket, key_idx)); + } + functor(key); + key.clear(); + } +} + +template <typename FunctorT> inline std::size_t PackedPayloadHashTable::forEachCompositeKey( FunctorT *functor, const std::size_t index) const { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/BoolVector.hpp ---------------------------------------------------------------------- diff --git a/utility/BoolVector.hpp b/utility/BoolVector.hpp new file mode 100644 index 0000000..1f16fc7 --- /dev/null +++ b/utility/BoolVector.hpp @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ +#define QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <memory> +#include <vector> + +#include "threading/SpinMutex.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +class BoolVector { + public: + BoolVector(void *memory_location, + const std::size_t length, + const bool initialize) + : owned_(false), + length_(length), + data_array_(static_cast<bool *>(memory_location)) { + DCHECK_GT(length, 0u); + DCHECK(data_array_ != nullptr); + + if (initialize) { + clear(); + } + } + + explicit BoolVector(const std::size_t length) + : owned_(true), + length_(length), + data_array_(static_cast<bool *>(std::malloc(sizeof(bool) * length))) { + DCHECK_GT(length, 0u); + clear(); + } + + ~BoolVector() { + if (owned_) { + std::free(data_array_); + } + } + + inline void clear() { + std::memset(data_array_, 0, sizeof(bool) * length_); + } + + inline void set(const std::size_t loc) { + data_array_[loc] = true; + } + + inline bool get(const std::size_t loc) const { + return data_array_[loc]; + } + + inline void unionWith(const BoolVector &other, + const std::size_t start_position, + const std::size_t end_position) const { + for (std::size_t loc = start_position; loc < end_position; ++loc) { + data_array_[loc] |= other.data_array_[loc]; + } + } + + inline std::size_t onesCountInRange(const std::size_t start_position, + const std::size_t end_position) const { + DCHECK_LE(start_position, end_position); + DCHECK_LT(start_position, length_); + DCHECK_LE(end_position, length_); + + std::size_t count = 0; + for (std::size_t i = start_position; i < end_position; ++i) { + count += data_array_[i]; + } + return count; + } + + private: + const bool owned_; + const std::size_t length_; + bool *data_array_; + + DISALLOW_COPY_AND_ASSIGN(BoolVector); +}; + +class BoolVectorPool { + public: + explicit BoolVectorPool(const std::size_t vector_length) + : vector_length_(vector_length) {} + + BoolVector* checkOut() { + { + SpinMutexLock lock(mutex_); + if (!pool_.empty()) { + BoolVector *ret = pool_.back().release(); + pool_.pop_back(); + return ret; + } + } + return new BoolVector(vector_length_); + } + + void checkIn(BoolVector *bool_vector) { + SpinMutexLock lock(mutex_); + pool_.emplace_back(bool_vector); + } + + std::vector<std::unique_ptr<BoolVector>>& getAll() { + return pool_; + } + + const std::vector<std::unique_ptr<BoolVector>>& getAll() const { + return pool_; + } + + private: + const std::size_t vector_length_; + + SpinMutex mutex_; + std::vector<std::unique_ptr<BoolVector>> pool_; + + DISALLOW_COPY_AND_ASSIGN(BoolVectorPool); +}; + +class BarrieredReadWriteConcurrentBoolVector { + public: + BarrieredReadWriteConcurrentBoolVector(void *memory_location, + const std::size_t length, + const bool initialize) + : owned_(false), + length_(length), + data_array_(static_cast<DataType *>(memory_location)) { + DCHECK_GT(length, 0u); + DCHECK(data_array_ != nullptr); + + if (initialize) { + clear(); + } + } + + explicit BarrieredReadWriteConcurrentBoolVector(const std::size_t length) + : owned_(true), + length_(length), + data_array_(static_cast<DataType *>(std::malloc(BytesNeeded(length)))) { + DCHECK_GT(length, 0u); + clear(); + } + + ~BarrieredReadWriteConcurrentBoolVector() { + if (owned_) { + std::free(data_array_); + } + } + + inline static std::size_t BytesNeeded(const std::size_t length) { + return kDataSize * length; + } + + inline void clear() { + std::memset(data_array_, 0, BytesNeeded(length_)); + } + + inline void set(const std::size_t loc) { + data_array_[loc].store(true, std::memory_order_relaxed); + } + + inline bool get(const std::size_t loc) const { + return data_array_[loc].load(std::memory_order_relaxed); + } + + inline std::size_t onesCountInRange(const std::size_t start_position, + const std::size_t end_position) const { + DCHECK_LE(start_position, end_position); + DCHECK_LT(start_position, length_); + DCHECK_LE(end_position, length_); + + std::size_t count = 0; + for (std::size_t i = start_position; i < end_position; ++i) { + count += data_array_[i].load(std::memory_order_relaxed); + } + return count; + } + + private: + typedef std::atomic<bool> DataType; + static constexpr std::size_t kDataSize = sizeof(DataType); + + const bool owned_; + const std::size_t length_; + DataType *data_array_; + + DISALLOW_COPY_AND_ASSIGN(BarrieredReadWriteConcurrentBoolVector); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/bc81c5b3/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index ca04462..e9a978e 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp) add_library(quickstep_utility_BloomFilter_proto ${quickstep_utility_BloomFilter_proto_srcs} ${quickstep_utility_BloomFilter_proto_hdrs}) +add_library(quickstep_utility_BoolVector ../empty_src.cpp BoolVector.hpp) add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp) add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp) add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp) @@ -233,6 +234,9 @@ target_link_libraries(quickstep_utility_BloomFilter quickstep_utility_Macros) target_link_libraries(quickstep_utility_BloomFilter_proto ${PROTOBUF_LIBRARY}) +target_link_libraries(quickstep_utility_BoolVector + quickstep_threading_SpinMutex + quickstep_utility_Macros) target_link_libraries(quickstep_utility_CalculateInstalledMemory glog) target_link_libraries(quickstep_utility_CheckSnprintf @@ -341,6 +345,7 @@ target_link_libraries(quickstep_utility quickstep_utility_BitVector quickstep_utility_BloomFilter quickstep_utility_BloomFilter_proto + quickstep_utility_BoolVector quickstep_utility_CalculateInstalledMemory quickstep_utility_Cast quickstep_utility_CheckSnprintf