Repository: incubator-quickstep Updated Branches: refs/heads/agg-expr [created] 38987a5cf
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/38987a5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/38987a5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/38987a5c Branch: refs/heads/agg-expr Commit: 38987a5cf56fce23dcb31c3f0a83d050035eae12 Parents: 1572762 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Wed Feb 22 13:58:08 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Wed Feb 22 13:58:08 2017 -0600 ---------------------------------------------------------------------- expressions/aggregation/AggFunc.hpp | 149 +++++++++ expressions/aggregation/CMakeLists.txt | 10 + storage/CMakeLists.txt | 3 +- storage/CollisionFreeVectorTable.cpp | 276 +++++++--------- storage/CollisionFreeVectorTable.hpp | 495 ++-------------------------- utility/BoolVector.hpp | 148 +++++++++ utility/CMakeLists.txt | 4 + 7 files changed, 451 insertions(+), 634 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/expressions/aggregation/AggFunc.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp new file mode 100644 index 0000000..ef6179d --- /dev/null +++ b/expressions/aggregation/AggFunc.hpp @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ +#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <type_traits> + +#include "expressions/aggregation/AggregationID.hpp" +#include "utility/Macros.hpp" +#include "types/IntType.hpp" +#include "types/LongType.hpp" +#include "types/FloatType.hpp" +#include "types/DoubleType.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class ColumnVector; +class StorageManager; +class Type; + +/** \addtogroup Expressions + * @{ + */ + +struct InvalidType {}; + +template <typename T, typename U> +struct is_different : std::true_type {}; + +template <typename T> +struct is_different<T, T> : std::false_type {}; + +class Sum { + public: + Sum() {} + + template <typename ArgType> + struct AggState { + typedef InvalidType T; + typedef InvalidType AtomicT; + typedef InvalidType ResultT; + }; + + template <typename ArgType> + struct HasAtomicImpl : + is_different<InvalidType, + typename AggState<ArgType>::AtomicT> {}; + + template <typename ArgType> + inline static void MergeArgAtomic(const typename ArgType::cpptype &value, + typename AggState<ArgType>::AtomicT *state); + + template <typename ArgType> + inline static void FinalizeAtomic(const typename AggState<ArgType>::AtomicT &state, + typename AggState<ArgType>::ResultT *result) { + *result = state.load(std::memory_order_relaxed); + } + + private: + DISALLOW_COPY_AND_ASSIGN(Sum); +}; + +//------------------------------------------------------------------------------ +// Implementation of Sum for IntType +template <> +struct Sum::AggState<IntType> { + typedef std::int64_t T; + typedef std::atomic<std::int64_t> AtomicT; + typedef std::int64_t ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<IntType>(const IntType::cpptype &value, + AggState<IntType>::AtomicT *state) { + state->fetch_add(value, std::memory_order_relaxed); +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for LongType +template <> +struct Sum::AggState<LongType> { + typedef std::int64_t T; + typedef std::atomic<std::int64_t> AtomicT; + typedef std::int64_t ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<LongType>(const LongType::cpptype &value, + AggState<LongType>::AtomicT *state) { + state->fetch_add(value, std::memory_order_relaxed); +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for FloatType +template <> +struct Sum::AggState<FloatType> { + typedef double T; + typedef std::atomic<double> AtomicT; + typedef double ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<FloatType>(const FloatType::cpptype &value, + AggState<FloatType>::AtomicT *state) { + AggState<FloatType>::T state_val = state->load(std::memory_order_relaxed); + while (!state->compare_exchange_weak(state_val, state_val + value)) {} +} + +//------------------------------------------------------------------------------ +// Implementation of Sum for DoubleType +template <> +struct Sum::AggState<DoubleType> { + typedef double T; + typedef std::atomic<double> AtomicT; + typedef double ResultT; +}; + +template <> +inline void Sum::MergeArgAtomic<DoubleType>(const DoubleType::cpptype &value, + AggState<DoubleType>::AtomicT *state) { + AggState<DoubleType>::T state_val = state->load(std::memory_order_relaxed); + while (!state->compare_exchange_weak(state_val, state_val + value)) {} +} + +} // namespace quickstep + +#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGG_FUNC_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 4220a8d..c0ebad7 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(expressions_aggregation_AggregateFunction_proto_srcs AggregateFunction.proto) # Declare micro-libs: +add_library(quickstep_expressions_aggregation_AggFunc ../../empty_src.cpp AggFunc.hpp) add_library(quickstep_expressions_aggregation_AggregateFunction AggregateFunction.cpp AggregateFunction.hpp) @@ -69,6 +70,14 @@ add_library(quickstep_expressions_aggregation_AggregationID AggregationID.hpp) # Link dependencies: +target_link_libraries(quickstep_expressions_aggregation_AggFunc + glog + quickstep_expressions_aggregation_AggregationID + quickstep_types_DoubleType + quickstep_types_FloatType + quickstep_types_IntType + quickstep_types_LongType + quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregateFunction glog quickstep_expressions_aggregation_AggregateFunction_proto @@ -236,6 +245,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum # Submodule all-in-one library: add_library(quickstep_expressions_aggregation ../../empty_src.cpp) target_link_libraries(quickstep_expressions_aggregation + quickstep_expressions_aggregation_AggFunc quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto quickstep_expressions_aggregation_AggregateFunctionAvg http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 293be17..b383a14 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -437,6 +437,7 @@ endif() # CMAKE_VALIDATE_IGNORE_END target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggFunc quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase @@ -451,7 +452,7 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_types_TypeID quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor - quickstep_utility_BarrieredReadWriteConcurrentBitVector + quickstep_utility_BoolVector quickstep_utility_Macros) target_link_libraries(quickstep_storage_ColumnStoreUtil quickstep_catalog_CatalogAttribute http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp index d836014..2bafc4c 100644 --- a/storage/CollisionFreeVectorTable.cpp +++ b/storage/CollisionFreeVectorTable.cpp @@ -24,6 +24,7 @@ #include <cstdint> #include <cstdlib> #include <memory> +#include <type_traits> #include <vector> #include "expressions/aggregation/AggregationHandle.hpp" @@ -33,13 +34,72 @@ #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorMultiplexer.hpp" #include "storage/ValueAccessorUtil.hpp" +#include "types/TypeID.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/BarrieredReadWriteConcurrentBitVector.hpp" +#include "utility/BoolVector.hpp" #include "glog/logging.h" namespace quickstep { +namespace { + + +template <typename FunctorT> +inline auto InvokeOnType(const Type &type, + const FunctorT &functor) { + switch (type.getTypeID()) { + case TypeID::kInt: + return functor(static_cast<const IntType&>(type)); + case TypeID::kLong: + return functor(static_cast<const LongType&>(type)); + case TypeID::kFloat: + return functor(static_cast<const FloatType&>(type)); + case TypeID::kDouble: + return functor(static_cast<const DoubleType&>(type)); + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeOnBool(const bool &val, + FunctorT &functor) { + if (val) { + return functor(std::true_type()); + } else { + return functor(std::false_type()); + } +} + +template <typename FunctorT> +inline auto InvokeOnAggFunc(const AggregationID &agg_id, + const FunctorT &functor) { + switch (agg_id) { + case AggregationID::kSum: { + return functor(Sum()); + } + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeIf(const std::true_type &val, + const FunctorT &functor) { + std::cout << "True\n"; + return functor(); +} + +template <typename FunctorT> +inline void InvokeIf(const std::false_type &val, + const FunctorT &functor) { + std::cout << "False\n"; +} + +} // namespace + CollisionFreeVectorTable::CollisionFreeVectorTable( const Type *key_type, const std::size_t num_entries, @@ -63,32 +123,31 @@ CollisionFreeVectorTable::CollisionFreeVectorTable( for (std::size_t i = 0; i < num_handles_; ++i) { const AggregationHandle *handle = handles_[i]; const std::vector<const Type *> argument_types = handle->getArgumentTypes(); + DCHECK_EQ(1u, argument_types.size()); std::size_t state_size = 0; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - state_size = sizeof(std::atomic<std::size_t>); - break; - } - case AggregationID::kSum: { - DCHECK_EQ(1u, argument_types.size()); - switch (argument_types.front()->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - state_size = sizeof(std::atomic<std::int64_t>); - break; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - state_size = sizeof(std::atomic<double>); - break; - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } + InvokeOnAggFunc( + handle->getAggregationID(), + [&](const auto &agg_func) -> void { + InvokeOnType( + *argument_types.front(), + [&](const auto &type) -> void { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = std::remove_reference_t<decltype(type)>; + + std::cout << AggFuncT::template HasAtomicImpl<IntType>::value << "\n"; + std::cout << AggFuncT::template HasAtomicImpl<LongType>::value << "\n"; + std::cout << AggFuncT::template HasAtomicImpl<FloatType>::value << "\n"; + std::cout << AggFuncT::template HasAtomicImpl<DoubleType>::value << "\n"; + + InvokeIf( + typename AggFuncT::template HasAtomicImpl<ArgT>(), + [&]() -> void { + state_size = sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT); + }); + }); + }); + std::cout << state_size << "\n"; state_offsets.emplace_back(required_memory); required_memory += CacheLineAlignedBytes(state_size * num_entries); @@ -129,157 +188,46 @@ bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, const ValueAccessorMultiplexer &accessor_mux) { - DCHECK_EQ(1u, key_ids.size()); - - if (handles_.empty()) { - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor_mux.getValueAccessorBySource(key_ids.front().source), - [&key_ids, this](auto *accessor) -> void { // NOLINT(build/c++11) - this->upsertValueAccessorKeyOnlyHelper(key_type_->isNullable(), - key_type_, - key_ids.front().attr_id, - accessor); - }); - return true; - } +// DCHECK_EQ(1u, key_ids.size()); +// +// if (handles_.empty()) { +// LOG(FATAL) << "Not implemented"; +// } +// +// DCHECK(accessor_mux.getDerivedAccessor() == nullptr || +// accessor_mux.getDerivedAccessor()->getImplementationType() +// == ValueAccessor::Implementation::kColumnVectors); +// +// ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); +// ColumnVectorsValueAccessor *derived_accesor = +// static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); +// +// const ValueAccessorSource key_source = key_ids.front().source; +// const attribute_id key_id = key_ids.front().attr_id; +// const bool is_key_nullable = key_type_->isNullable(); +// +// InvokeOnType( +// *key_type_, +// [&](const auto &key_type) { +// InvokeOnType( +// *argument_type_, +// [&](const auto &key_type) { +// +// }); +// }); + + (void) key_type_; - DCHECK(accessor_mux.getDerivedAccessor() == nullptr || - accessor_mux.getDerivedAccessor()->getImplementationType() - == ValueAccessor::Implementation::kColumnVectors); - - ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); - ColumnVectorsValueAccessor *derived_accesor = - static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); - - // Dispatch to specialized implementations to achieve maximum performance. - InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - base_accessor, - [&argument_ids, &key_ids, &derived_accesor, this](auto *accessor) -> void { // NOLINT(build/c++11) - const ValueAccessorSource key_source = key_ids.front().source; - const attribute_id key_id = key_ids.front().attr_id; - const bool is_key_nullable = key_type_->isNullable(); - - for (std::size_t i = 0; i < num_handles_; ++i) { - DCHECK_LE(argument_ids[i].size(), 1u); - - const AggregationHandle *handle = handles_[i]; - const auto &argument_types = handle->getArgumentTypes(); - const auto &argument_ids_i = argument_ids[i]; - - ValueAccessorSource argument_source; - attribute_id argument_id; - const Type *argument_type; - bool is_argument_nullable; - - if (argument_ids_i.empty()) { - argument_source = ValueAccessorSource::kInvalid; - argument_id = kInvalidAttributeID; - - DCHECK(argument_types.empty()); - argument_type = nullptr; - is_argument_nullable = false; - } else { - DCHECK_EQ(1u, argument_ids_i.size()); - argument_source = argument_ids_i.front().source; - argument_id = argument_ids_i.front().attr_id; - - DCHECK_EQ(1u, argument_types.size()); - argument_type = argument_types.front(); - is_argument_nullable = argument_type->isNullable(); - } - - if (key_source == ValueAccessorSource::kBase) { - if (argument_source == ValueAccessorSource::kBase) { - this->upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - accessor); - } else { - this->upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - accessor, - derived_accesor); - } - } else { - if (argument_source == ValueAccessorSource::kBase) { - this->upsertValueAccessorDispatchHelper<true>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - accessor); - } else { - this->upsertValueAccessorDispatchHelper<false>(is_key_nullable, - is_argument_nullable, - key_type_, - argument_type, - handle->getAggregationID(), - key_id, - argument_id, - vec_tables_[i], - derived_accesor, - derived_accesor); - } - } - } - }); return true; } void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id, NativeColumnVector *output_cv) const { - const std::size_t start_position = - calculatePartitionStartPosition(partition_id); - const std::size_t end_position = - calculatePartitionEndPosition(partition_id); - - switch (key_type_->getTypeID()) { - case TypeID::kInt: - finalizeKeyInternal<int>(start_position, end_position, output_cv); - return; - case TypeID::kLong: - finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv); - return; - default: - LOG(FATAL) << "Not supported"; - } } void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, const std::size_t handle_id, NativeColumnVector *output_cv) const { - const std::size_t start_position = - calculatePartitionStartPosition(partition_id); - const std::size_t end_position = - calculatePartitionEndPosition(partition_id); - - const AggregationHandle *handle = handles_[handle_id]; - const auto &argument_types = handle->getArgumentTypes(); - const Type *argument_type = - argument_types.empty() ? nullptr : argument_types.front(); - - finalizeStateDispatchHelper(handle->getAggregationID(), - argument_type, - vec_tables_[handle_id], - start_position, - end_position, - output_cv); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp index 772d47d..63d9615 100644 --- a/storage/CollisionFreeVectorTable.hpp +++ b/storage/CollisionFreeVectorTable.hpp @@ -29,6 +29,7 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggFunc.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/HashTableBase.hpp" #include "storage/StorageBlob.hpp" @@ -38,6 +39,7 @@ #include "types/TypeID.hpp" #include "types/containers/ColumnVector.hpp" #include "utility/BarrieredReadWriteConcurrentBitVector.hpp" +#include "utility/BoolVector.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -214,115 +216,15 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { num_entries_); } - template <bool use_two_accessors, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args); - - template <bool ...bool_values, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *key_type, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> - inline void upsertValueAccessorDispatchHelper( - const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountHelper( - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorSumHelper( - const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename ...ArgTypes> - inline void upsertValueAccessorKeyOnlyHelper( - const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorKeyOnly( - const attribute_id key_attr_id, - KeyValueAccessorT *key_accessor); - - template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> - inline void upsertValueAccessorCountNullary( - const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor); - - template <bool use_two_accessors, bool is_key_nullable, typename KeyT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorCountUnary( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, + template <typename KeyT, typename ArgumentT, typename AggFuncT, typename BoolVectorT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorIntegerSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> - inline void upsertValueAccessorGenericSum( - const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor); - - template <typename KeyT> - inline void finalizeKeyInternal(const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const; - - template <typename ...ArgTypes> - inline void finalizeStateDispatchHelper(const AggregationID agg_id, - const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const; - - template <typename ...ArgTypes> - inline void finalizeStateSumHelper(const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const; - - inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const; - - template <typename ResultT, typename StateT> - inline void finalizeStateSum(const std::atomic<StateT> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const; + inline void upsertValueAccessorUnary(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); const Type *key_type_; const std::size_t num_entries_; @@ -347,392 +249,47 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { // ---------------------------------------------------------------------------- // Implementations of template methods follow. -template <bool use_two_accessors, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const bool is_key_nullable, - const bool is_argument_nullable, - ArgTypes &&...args) { - if (is_key_nullable) { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, true, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, true, false>( - std::forward<ArgTypes>(args)...); - } - } else { - if (is_argument_nullable) { - upsertValueAccessorDispatchHelper<use_two_accessors, false, true>( - std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorDispatchHelper<use_two_accessors, false, false>( - std::forward<ArgTypes>(args)...); - } - } -} - -template <bool ...bool_values, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorDispatchHelper<bool_values..., int>( - std::forward<ArgTypes>(args)...); - return; - case TypeID::kLong: - upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>( - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorDispatchHelper(const Type *argument_type, - const AggregationID agg_id, - ArgTypes &&...args) { - switch (agg_id) { - case AggregationID::kCount: - upsertValueAccessorCountHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - upsertValueAccessorSumHelper< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( - argument_type, std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::upsertValueAccessorKeyOnlyHelper(const bool is_key_nullable, - const Type *key_type, - ArgTypes &&...args) { - switch (key_type->getTypeID()) { - case TypeID::kInt: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, int>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, int>(std::forward<ArgTypes>(args)...); - } - return; - } - case TypeID::kLong: { - if (is_key_nullable) { - upsertValueAccessorKeyOnly<true, std::int64_t>(std::forward<ArgTypes>(args)...); - } else { - upsertValueAccessorKeyOnly<false, std::int64_t>(std::forward<ArgTypes>(args)...); - } - return; - } - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorKeyOnly(const attribute_id key_attr_id, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - existence_map_->setBit(*key); - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorCountHelper(const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0); - - if (is_argument_nullable && argument_id != kInvalidAttributeID) { - upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - } else { - upsertValueAccessorCountNullary<is_key_nullable, KeyT>( - key_attr_id, - static_cast<std::atomic<std::size_t> *>(vec_table), - key_accessor); - return; - } -} - -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorSumHelper(const Type *argument_type, - const attribute_id key_attr_id, - const attribute_id argument_id, - void *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - DCHECK_GE(key_attr_id, 0); - DCHECK_GE(argument_id, 0); - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kLong: - upsertValueAccessorIntegerSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>( - key_attr_id, - argument_id, - static_cast<std::atomic<std::int64_t> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kFloat: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - case TypeID::kDouble: - upsertValueAccessorGenericSum< - use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>( - key_attr_id, - argument_id, - static_cast<std::atomic<double> *>(vec_table), - key_accessor, - argument_accessor); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorCountNullary(const attribute_id key_attr_id, - std::atomic<std::size_t> *vec_table, - ValueAccessorT *accessor) { - accessor->beginIteration(); - while (accessor->next()) { - const KeyT *key = static_cast<const KeyT *>( - accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - existence_map_->setBit(loc); - } -} - -template <bool use_two_accessors, bool is_key_nullable, typename KeyT, +template <typename KeyT, typename ArgT, typename AggFuncT, typename BoolVectorT, + bool is_key_nullable, bool is_argument_nullable, bool use_two_accessors, typename KeyValueAccessorT, typename ArgumentValueAccessorT> inline void CollisionFreeVectorTable - ::upsertValueAccessorCountUnary(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<std::size_t> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - if (argument_accessor->getUntypedValue(argument_id) == nullptr) { - continue; - } - vec_table[loc].fetch_add(1u, std::memory_order_relaxed); - } -} + ::upsertValueAccessorUnary(const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + BoolVectorT *existence_map, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + auto *states = static_cast< + typename AggFuncT::template AggState<ArgT>::AtomicT *>(vec_table); -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorIntegerSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { key_accessor->beginIteration(); if (use_two_accessors) { argument_accessor->beginIteration(); } - while (key_accessor->next()) { - if (use_two_accessors) { - argument_accessor->next(); - } - const KeyT *key = static_cast<const KeyT *>( - key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); - if (is_key_nullable && key == nullptr) { - continue; - } - const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( - argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); - if (is_argument_nullable && argument == nullptr) { - continue; - } - vec_table[loc].fetch_add(*argument, std::memory_order_relaxed); - } -} -template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, - typename KeyT, typename ArgumentT, typename StateT, - typename KeyValueAccessorT, typename ArgumentValueAccessorT> -inline void CollisionFreeVectorTable - ::upsertValueAccessorGenericSum(const attribute_id key_attr_id, - const attribute_id argument_id, - std::atomic<StateT> *vec_table, - KeyValueAccessorT *key_accessor, - ArgumentValueAccessorT *argument_accessor) { - key_accessor->beginIteration(); - if (use_two_accessors) { - argument_accessor->beginIteration(); - } while (key_accessor->next()) { if (use_two_accessors) { argument_accessor->next(); } - const KeyT *key = static_cast<const KeyT *>( + + const auto *key = static_cast<const typename KeyT::cpptype *>( key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); if (is_key_nullable && key == nullptr) { continue; } const std::size_t loc = *key; - existence_map_->setBit(loc); - const ArgumentT *argument = static_cast<const ArgumentT *>( + existence_map->setBit(loc); + + const auto *argument = static_cast<const typename ArgT::cpptype *>( argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); if (is_argument_nullable && argument == nullptr) { continue; } - const ArgumentT arg_val = *argument; - std::atomic<StateT> &state = vec_table[loc]; - StateT state_val = state.load(std::memory_order_relaxed); - while (!state.compare_exchange_weak(state_val, state_val + arg_val)) {} - } -} -template <typename KeyT> -inline void CollisionFreeVectorTable - ::finalizeKeyInternal(const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + AggFuncT::template MergeArgAtomic<ArgT>(*argument, states + loc); } } -template <typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::finalizeStateDispatchHelper(const AggregationID agg_id, - const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const { - switch (agg_id) { - case AggregationID::kCount: - finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - finalizeStateSumHelper(argument_type, - vec_table, - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename ...ArgTypes> -inline void CollisionFreeVectorTable - ::finalizeStateSumHelper(const Type *argument_type, - const void *vec_table, - ArgTypes &&...args) const { - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - finalizeStateSum<std::int64_t>( - static_cast<const std::atomic<std::int64_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - finalizeStateSum<double>( - static_cast<const std::atomic<double> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } -} - -inline void CollisionFreeVectorTable - ::finalizeStateCount(const std::atomic<std::size_t> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } -} - -template <typename ResultT, typename StateT> -inline void CollisionFreeVectorTable - ::finalizeStateSum(const std::atomic<StateT> *vec_table, - const std::size_t start_position, - const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } -} } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/utility/BoolVector.hpp ---------------------------------------------------------------------- diff --git a/utility/BoolVector.hpp b/utility/BoolVector.hpp new file mode 100644 index 0000000..5bf079f --- /dev/null +++ b/utility/BoolVector.hpp @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ +#define QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <cstring> +#include <limits> + +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Utility + * @{ + */ + +class BoolVector { + public: + BoolVector(void *memory_location, + const std::size_t length, + const bool initialize) + : owned_(false), + length_(length), + data_array_(static_cast<bool *>(memory_location)) { + DCHECK_GT(length, 0u); + DCHECK(data_array_ != nullptr); + + if (initialize) { + clear(); + } + } + + explicit BoolVector(const std::size_t length) + : owned_(true), + length_(length), + data_array_(static_cast<bool *>(std::malloc(sizeof(bool) * length))) { + DCHECK_GT(length, 0u); + clear(); + } + + ~BoolVector() { + if (owned_) { + std::free(data_array_); + } + } + + inline void clear() { + std::memset(data_array_, 0, sizeof(bool) * length_); + } + + inline void set(const std::size_t loc) { + data_array_[loc] = true; + } + + inline bool get(const std::size_t loc) { + return data_array_[loc]; + } + + private: + const bool owned_; + const std::size_t length_; + bool *data_array_; + + DISALLOW_COPY_AND_ASSIGN(BoolVector); +}; + + +class BarrieredReadWriteConcurrentBoolVector { + public: + BarrieredReadWriteConcurrentBoolVector(void *memory_location, + const std::size_t length, + const bool initialize) + : owned_(false), + length_(length), + data_array_(static_cast<DataType *>(memory_location)) { + DCHECK_GT(length, 0u); + DCHECK(data_array_ != nullptr); + + if (initialize) { + clear(); + } + } + + explicit BarrieredReadWriteConcurrentBoolVector(const std::size_t length) + : owned_(true), + length_(length), + data_array_(static_cast<DataType *>(std::malloc(kDataSize * length))) { + DCHECK_GT(length, 0u); + clear(); + } + + ~BarrieredReadWriteConcurrentBoolVector() { + if (owned_) { + std::free(data_array_); + } + } + + inline void clear() { + std::memset(data_array_, 0, kDataSize * length_); + } + + inline void set(const std::size_t loc) { + data_array_[loc].store(true, std::memory_order_relaxed); + } + + inline bool get(const std::size_t loc) { + return data_array_[loc].load(std::memory_order_relaxed); + } + + private: + typedef std::atomic<bool> DataType; + static constexpr std::size_t kDataSize = sizeof(DataType); + + const bool owned_; + const std::size_t length_; + DataType *data_array_; + + DISALLOW_COPY_AND_ASSIGN(BarrieredReadWriteConcurrentBoolVector); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_BOOL_VECTOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/38987a5c/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index ca04462..65dfd0a 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -168,6 +168,7 @@ add_library(quickstep_utility_BloomFilter ../empty_src.cpp BloomFilter.hpp) add_library(quickstep_utility_BloomFilter_proto ${quickstep_utility_BloomFilter_proto_srcs} ${quickstep_utility_BloomFilter_proto_hdrs}) +add_library(quickstep_utility_BoolVector ../empty_src.cpp BoolVector.hpp) add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp) add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp) add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp) @@ -233,6 +234,8 @@ target_link_libraries(quickstep_utility_BloomFilter quickstep_utility_Macros) target_link_libraries(quickstep_utility_BloomFilter_proto ${PROTOBUF_LIBRARY}) +target_link_libraries(quickstep_utility_BoolVector + quickstep_utility_Macros) target_link_libraries(quickstep_utility_CalculateInstalledMemory glog) target_link_libraries(quickstep_utility_CheckSnprintf @@ -341,6 +344,7 @@ target_link_libraries(quickstep_utility quickstep_utility_BitVector quickstep_utility_BloomFilter quickstep_utility_BloomFilter_proto + quickstep_utility_BoolVector quickstep_utility_CalculateInstalledMemory quickstep_utility_Cast quickstep_utility_CheckSnprintf