http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index c2d571b..d40ae9f 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -38,100 +38,100 @@ namespace quickstep { class StorageManager; -AggregationHandleMax::AggregationHandleMax(const Type &type) - : type_(type), block_update_(false) { - fast_comparator_.reset( - ComparisonFactory::GetComparison(ComparisonID::kGreater) - .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); -} - -AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleMax::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size(); - - return new AggregationStateMax(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMax::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MAX: " << accessor_ids.size(); - - return new AggregationStateMax(fast_comparator_->accumulateValueAccessor( - type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMax::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MAX: " << argument_ids.size(); -} - -void AggregationHandleMax::mergeStates(const AggregationState &source, - AggregationState *destination) const { - const AggregationStateMax &max_source = - static_cast<const AggregationStateMax &>(source); - AggregationStateMax *max_destination = - static_cast<AggregationStateMax *>(destination); - - if (!max_source.max_.isNull()) { - compareAndUpdate(max_destination, max_source.max_); - } -} - -void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { - const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source); - TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination); - if (!(src_max_ptr->isNull())) { - compareAndUpdateFast(dst_max_ptr, *src_max_ptr); - } -} - -ColumnVector* AggregationHandleMax::finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast<AggregationHandleMax, - AggregationStateFastHashTable>( - type_.getNullableVersion(), hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMax, - AggregationStateMax>(distinctify_hash_table); -} - -void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleMax, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); -} +AggregationHandleMax::AggregationHandleMax(const Type &type) {} +// : type_(type), block_update_(false) { +// fast_comparator_.reset( +// ComparisonFactory::GetComparison(ComparisonID::kGreater) +// .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); +//} +// +//AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable( +// const HashTableImplType hash_table_impl, +// const std::vector<const Type *> &group_by_types, +// const std::size_t estimated_num_groups, +// StorageManager *storage_manager) const { +// return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable( +// hash_table_impl, group_by_types, estimated_num_groups, storage_manager); +//} +// +//AggregationState* AggregationHandleMax::accumulateColumnVectors( +// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { +// DCHECK_EQ(1u, column_vectors.size()) +// << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size(); +// +// return new AggregationStateMax(fast_comparator_->accumulateColumnVector( +// type_.getNullableVersion().makeNullValue(), *column_vectors.front())); +//} +// +//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +//AggregationState* AggregationHandleMax::accumulateValueAccessor( +// ValueAccessor *accessor, +// const std::vector<attribute_id> &accessor_ids) const { +// DCHECK_EQ(1u, accessor_ids.size()) +// << "Got wrong number of attributes for MAX: " << accessor_ids.size(); +// +// return new AggregationStateMax(fast_comparator_->accumulateValueAccessor( +// type_.getNullableVersion().makeNullValue(), +// accessor, +// accessor_ids.front())); +//} +//#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +// +//void AggregationHandleMax::aggregateValueAccessorIntoHashTable( +// ValueAccessor *accessor, +// const std::vector<attribute_id> &argument_ids, +// const std::vector<attribute_id> &group_by_key_ids, +// AggregationStateHashTableBase *hash_table) const { +// DCHECK_EQ(1u, argument_ids.size()) +// << "Got wrong number of arguments for MAX: " << argument_ids.size(); +//} +// +//void AggregationHandleMax::mergeStates(const AggregationState &source, +// AggregationState *destination) const { +// const AggregationStateMax &max_source = +// static_cast<const AggregationStateMax &>(source); +// AggregationStateMax *max_destination = +// static_cast<AggregationStateMax *>(destination); +// +// if (!max_source.max_.isNull()) { +// compareAndUpdate(max_destination, max_source.max_); +// } +//} +// +//void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source, +// std::uint8_t *destination) const { +// const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source); +// TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination); +// if (!(src_max_ptr->isNull())) { +// compareAndUpdateFast(dst_max_ptr, *src_max_ptr); +// } +//} +// +//ColumnVector* AggregationHandleMax::finalizeHashTable( +// const AggregationStateHashTableBase &hash_table, +// std::vector<std::vector<TypedValue>> *group_by_keys, +// int index) const { +// return finalizeHashTableHelperFast<AggregationHandleMax, +// AggregationStateFastHashTable>( +// type_.getNullableVersion(), hash_table, group_by_keys, index); +//} +// +//AggregationState* +//AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( +// const AggregationStateHashTableBase &distinctify_hash_table) const { +// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< +// AggregationHandleMax, +// AggregationStateMax>(distinctify_hash_table); +//} +// +//void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( +// const AggregationStateHashTableBase &distinctify_hash_table, +// AggregationStateHashTableBase *aggregation_hash_table, +// std::size_t index) const { +// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< +// AggregationHandleMax, +// AggregationStateFastHashTable>( +// distinctify_hash_table, aggregation_hash_table, index); +//} } // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index 5fb9f44..effc38f 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -26,9 +26,7 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -49,149 +47,12 @@ class ValueAccessor; */ /** - * @brief Aggregation state for max. - */ -class AggregationStateMax : public AggregationState { - public: - /** - * @brief Copy constructor (ignores mutex). - */ - AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {} - - /** - * @brief Destructor. - */ - ~AggregationStateMax() override{}; - - const std::uint8_t* getPayloadAddress() const { - return reinterpret_cast<const uint8_t *>(&max_); - } - - private: - friend class AggregationHandleMax; - - explicit AggregationStateMax(const Type &type) - : max_(type.getNullableVersion().makeNullValue()) {} - - explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {} - - TypedValue max_; - SpinMutex mutex_; -}; - -/** * @brief An aggregationhandle for max. **/ -class AggregationHandleMax : public AggregationConcreteHandle { +class AggregationHandleMax : public AggregationHandle { public: ~AggregationHandleMax() override {} - AggregationState* createInitialState() const override { - return new AggregationStateMax(type_); - } - - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate with max aggregation state. - */ - inline void iterateUnaryInl(AggregationStateMax *state, - const TypedValue &value) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - compareAndUpdate(static_cast<AggregationStateMax *>(state), value); - } - - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); - compareAndUpdateFast(max_ptr, value); - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } - - void blockUpdate() override { block_update_ = true; } - - void allowUpdate() override { block_update_ = false; } - - void initPayload(std::uint8_t *byte_ptr) const override { - TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); - TypedValue t1 = (type_.getNullableVersion().makeNullValue()); - *max_ptr = t1; - } - - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override { - return TypedValue(static_cast<const AggregationStateMax &>(state).max_); - } - - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return TypedValue(static_cast<const AggregationStateMax &>(state).max_); - } - - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); - return TypedValue(*max_ptr); - } - - ColumnVector* finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MAX aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MAX aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } - private: friend class AggregateFunctionMax; @@ -202,37 +63,8 @@ class AggregationHandleMax : public AggregationConcreteHandle { **/ explicit AggregationHandleMax(const Type &type); - /** - * @brief compare the value with max_ and update it if the value is larger - * than current maximum. NULLs are ignored. - * - * @param value A TypedValue to compare - **/ - inline void compareAndUpdate(AggregationStateMax *state, - const TypedValue &value) const { - // TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type. - if (value.isNull()) return; - - SpinMutexLock lock(state->mutex_); - if (state->max_.isNull() || - fast_comparator_->compareTypedValues(value, state->max_)) { - state->max_ = value; - } - } - - inline void compareAndUpdateFast(TypedValue *max_ptr, - const TypedValue &value) const { - if (value.isNull()) return; - if (max_ptr->isNull() || - fast_comparator_->compareTypedValues(value, *max_ptr)) { - *max_ptr = value; - } - } - - const Type &type_; - std::unique_ptr<UncheckedComparator> fast_comparator_; - - bool block_update_; +// const Type &type_; +// std::unique_ptr<UncheckedComparator> fast_comparator_; DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index a07f299..4765c93 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -38,101 +38,101 @@ namespace quickstep { class StorageManager; -AggregationHandleMin::AggregationHandleMin(const Type &type) - : type_(type), block_update_(false) { - fast_comparator_.reset( - ComparisonFactory::GetComparison(ComparisonID::kLess) - .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); -} - -AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleMin::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size(); - - return new AggregationStateMin(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMin::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MIN: " << accessor_ids.size(); - - return new AggregationStateMin(fast_comparator_->accumulateValueAccessor( - type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMin::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MIN: " << argument_ids.size(); -} - -void AggregationHandleMin::mergeStates(const AggregationState &source, - AggregationState *destination) const { - const AggregationStateMin &min_source = - static_cast<const AggregationStateMin &>(source); - AggregationStateMin *min_destination = - static_cast<AggregationStateMin *>(destination); - - if (!min_source.min_.isNull()) { - compareAndUpdate(min_destination, min_source.min_); - } -} - -void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { - const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source); - TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination); - - if (!(src_min_ptr->isNull())) { - compareAndUpdateFast(dst_min_ptr, *src_min_ptr); - } -} - -ColumnVector* AggregationHandleMin::finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast<AggregationHandleMin, - AggregationStateFastHashTable>( - type_.getNonNullableVersion(), hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMin, - AggregationStateMin>(distinctify_hash_table); -} - -void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleMin, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); -} +AggregationHandleMin::AggregationHandleMin(const Type &type) {} +// : type_(type), block_update_(false) { +// fast_comparator_.reset( +// ComparisonFactory::GetComparison(ComparisonID::kLess) +// .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); +//} +// +//AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable( +// const HashTableImplType hash_table_impl, +// const std::vector<const Type *> &group_by_types, +// const std::size_t estimated_num_groups, +// StorageManager *storage_manager) const { +// return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable( +// hash_table_impl, group_by_types, estimated_num_groups, storage_manager); +//} +// +//AggregationState* AggregationHandleMin::accumulateColumnVectors( +// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { +// DCHECK_EQ(1u, column_vectors.size()) +// << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size(); +// +// return new AggregationStateMin(fast_comparator_->accumulateColumnVector( +// type_.getNullableVersion().makeNullValue(), *column_vectors.front())); +//} +// +//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +//AggregationState* AggregationHandleMin::accumulateValueAccessor( +// ValueAccessor *accessor, +// const std::vector<attribute_id> &accessor_ids) const { +// DCHECK_EQ(1u, accessor_ids.size()) +// << "Got wrong number of attributes for MIN: " << accessor_ids.size(); +// +// return new AggregationStateMin(fast_comparator_->accumulateValueAccessor( +// type_.getNullableVersion().makeNullValue(), +// accessor, +// accessor_ids.front())); +//} +//#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +// +//void AggregationHandleMin::aggregateValueAccessorIntoHashTable( +// ValueAccessor *accessor, +// const std::vector<attribute_id> &argument_ids, +// const std::vector<attribute_id> &group_by_key_ids, +// AggregationStateHashTableBase *hash_table) const { +// DCHECK_EQ(1u, argument_ids.size()) +// << "Got wrong number of arguments for MIN: " << argument_ids.size(); +//} +// +//void AggregationHandleMin::mergeStates(const AggregationState &source, +// AggregationState *destination) const { +// const AggregationStateMin &min_source = +// static_cast<const AggregationStateMin &>(source); +// AggregationStateMin *min_destination = +// static_cast<AggregationStateMin *>(destination); +// +// if (!min_source.min_.isNull()) { +// compareAndUpdate(min_destination, min_source.min_); +// } +//} +// +//void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source, +// std::uint8_t *destination) const { +// const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source); +// TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination); +// +// if (!(src_min_ptr->isNull())) { +// compareAndUpdateFast(dst_min_ptr, *src_min_ptr); +// } +//} +// +//ColumnVector* AggregationHandleMin::finalizeHashTable( +// const AggregationStateHashTableBase &hash_table, +// std::vector<std::vector<TypedValue>> *group_by_keys, +// int index) const { +// return finalizeHashTableHelperFast<AggregationHandleMin, +// AggregationStateFastHashTable>( +// type_.getNonNullableVersion(), hash_table, group_by_keys, index); +//} +// +//AggregationState* +//AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( +// const AggregationStateHashTableBase &distinctify_hash_table) const { +// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< +// AggregationHandleMin, +// AggregationStateMin>(distinctify_hash_table); +//} +// +//void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( +// const AggregationStateHashTableBase &distinctify_hash_table, +// AggregationStateHashTableBase *aggregation_hash_table, +// std::size_t index) const { +// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< +// AggregationHandleMin, +// AggregationStateFastHashTable>( +// distinctify_hash_table, aggregation_hash_table, index); +//} } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 173911d..64fddea 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -26,11 +26,8 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" -#include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/operations/comparisons/Comparison.hpp" @@ -49,151 +46,12 @@ class ValueAccessor; */ /** - * @brief Aggregation state for min. - */ -class AggregationStateMin : public AggregationState { - public: - /** - * @brief Copy constructor (ignores mutex). - */ - AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {} - - /** - * @brief Destructor. - */ - ~AggregationStateMin() override {} - - std::size_t getPayloadSize() const { return sizeof(TypedValue); } - - const std::uint8_t *getPayloadAddress() const { - return reinterpret_cast<const uint8_t *>(&min_); - } - - private: - friend class AggregationHandleMin; - - explicit AggregationStateMin(const Type &type) - : min_(type.getNullableVersion().makeNullValue()) {} - - explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {} - - TypedValue min_; - SpinMutex mutex_; -}; - -/** * @brief An aggregationhandle for min. **/ -class AggregationHandleMin : public AggregationConcreteHandle { +class AggregationHandleMin : public AggregationHandle { public: ~AggregationHandleMin() override {} - AggregationState* createInitialState() const override { - return new AggregationStateMin(type_); - } - - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate with min aggregation state. - */ - inline void iterateUnaryInl(AggregationStateMin *state, - const TypedValue &value) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - compareAndUpdate(state, value); - } - - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); - compareAndUpdateFast(min_ptr, value); - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } - - void blockUpdate() override { block_update_ = true; } - - void allowUpdate() override { block_update_ = false; } - - void initPayload(std::uint8_t *byte_ptr) const override { - TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); - TypedValue t1 = (type_.getNullableVersion().makeNullValue()); - *min_ptr = t1; - } - - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override { - return static_cast<const AggregationStateMin &>(state).min_; - } - - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast<const AggregationStateMin &>(state).min_; - } - - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); - return TypedValue(*min_ptr); - } - - ColumnVector* finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for MIN aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for MIN aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(TypedValue); } - private: friend class AggregateFunctionMin; @@ -204,36 +62,8 @@ class AggregationHandleMin : public AggregationConcreteHandle { **/ explicit AggregationHandleMin(const Type &type); - /** - * @brief compare the value with min_ and update it if the value is smaller - * than current minimum. NULLs are ignored. - * - * @param value A TypedValue to compare. - **/ - inline void compareAndUpdate(AggregationStateMin *state, - const TypedValue &value) const { - if (value.isNull()) return; - - SpinMutexLock lock(state->mutex_); - if (state->min_.isNull() || - fast_comparator_->compareTypedValues(value, state->min_)) { - state->min_ = value; - } - } - - inline void compareAndUpdateFast(TypedValue *min_ptr, - const TypedValue &value) const { - if (value.isNull()) return; - if (min_ptr->isNull() || - fast_comparator_->compareTypedValues(value, *min_ptr)) { - *min_ptr = value; - } - } - - const Type &type_; - std::unique_ptr<UncheckedComparator> fast_comparator_; - - bool block_update_; +// const Type &type_; +// std::unique_ptr<UncheckedComparator> fast_comparator_; DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 642d88d..4e77ed0 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -20,6 +20,7 @@ #include "expressions/aggregation/AggregationHandleSum.hpp" #include <cstddef> +#include <cstring> #include <memory> #include <utility> #include <vector> @@ -35,6 +36,7 @@ #include "types/operations/binary_operations/BinaryOperation.hpp" #include "types/operations/binary_operations/BinaryOperationFactory.hpp" #include "types/operations/binary_operations/BinaryOperationID.hpp" +#include "types/TypeFunctors.hpp" #include "glog/logging.h" @@ -42,12 +44,11 @@ namespace quickstep { class StorageManager; -AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type), block_update_(false) { +AggregationHandleSum::AggregationHandleSum(const Type &argument_type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; - switch (argument_type_.getTypeID()) { + switch (argument_type.getTypeID()) { case kInt: case kLong: type_precision_id = kLong; @@ -57,134 +58,57 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) type_precision_id = kDouble; break; default: - type_precision_id = type.getTypeID(); + type_precision_id = argument_type.getTypeID(); break; } const Type &sum_type = TypeFactory::GetType(type_precision_id); - blank_state_.sum_ = sum_type.makeZeroValue(); + state_size_ = sum_type.maximumByteLength(); + blank_state_.reset(state_size_, false); + + tv_blank_state_ = sum_type.makeZeroValue(); // Make operators to do arithmetic: // Add operator for summing argument values. - fast_operator_.reset( + accumulate_operator_.reset( BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) - .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_)); + .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type)); + accumulate_functor_ = accumulate_operator_->getMergeFunctor(); + // Add operator for merging states. merge_operator_.reset( BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type)); + merge_functor_ = merge_operator_->getMergeFunctor(); - // Result is nullable, because SUM() over 0 values (or all NULL values) is - // NULL. - result_type_ = &sum_type.getNullableVersion(); + finalize_functor_ = MakeUntypedCopyFunctor(&sum_type); + result_type_ = &sum_type; } -AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} - -AggregationState* AggregationHandleSum::accumulateColumnVectors( +void AggregationHandleSum::accumulateColumnVectors( + void *state, const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { DCHECK_EQ(1u, column_vectors.size()) << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); std::size_t num_tuples = 0; - TypedValue cv_sum = fast_operator_->accumulateColumnVector( - blank_state_.sum_, *column_vectors.front(), &num_tuples); - return new AggregationStateSum(std::move(cv_sum), num_tuples == 0); + TypedValue cv_sum = accumulate_operator_->accumulateColumnVector( + tv_blank_state_, *column_vectors.front(), &num_tuples); + cv_sum.copyInto(state); } #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleSum::accumulateValueAccessor( +void AggregationHandleSum::accumulateValueAccessor( + void *state, ValueAccessor *accessor, const std::vector<attribute_id> &accessor_ids) const { DCHECK_EQ(1u, accessor_ids.size()) << "Got wrong number of attributes for SUM: " << accessor_ids.size(); std::size_t num_tuples = 0; - TypedValue va_sum = fast_operator_->accumulateValueAccessor( - blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples); - return new AggregationStateSum(std::move(va_sum), num_tuples == 0); + TypedValue va_sum = accumulate_operator_->accumulateValueAccessor( + tv_blank_state_, accessor, accessor_ids.front(), &num_tuples); + va_sum.copyInto(state); } #endif -void AggregationHandleSum::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for SUM: " << argument_ids.size(); -} - -void AggregationHandleSum::mergeStates(const AggregationState &source, - AggregationState *destination) const { - const AggregationStateSum &sum_source = - static_cast<const AggregationStateSum &>(source); - AggregationStateSum *sum_destination = - static_cast<AggregationStateSum *>(destination); - - SpinMutexLock lock(sum_destination->mutex_); - sum_destination->sum_ = merge_operator_->applyToTypedValues( - sum_destination->sum_, sum_source.sum_); - sum_destination->null_ = sum_destination->null_ && sum_source.null_; -} - -void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { - const TypedValue *src_sum_ptr = - reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_); - const bool *src_null_ptr = - reinterpret_cast<const bool *>(source + blank_state_.null_offset_); - TypedValue *dst_sum_ptr = - reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_); - bool *dst_null_ptr = - reinterpret_cast<bool *>(destination + blank_state_.null_offset_); - *dst_sum_ptr = - merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr); - *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr); -} - -TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { - const AggregationStateSum &agg_state = - static_cast<const AggregationStateSum &>(state); - if (agg_state.null_) { - // SUM() over no values is NULL. - return result_type_->makeNullValue(); - } else { - return agg_state.sum_; - } -} - -ColumnVector* AggregationHandleSum::finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast<AggregationHandleSum, - AggregationStateFastHashTable>( - *result_type_, hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleSum, - AggregationStateSum>(distinctify_hash_table); -} - -void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleSum, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); -} - } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index 6c334a6..f45e87e 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -26,198 +26,39 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" -#include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/operations/binary_operations/BinaryOperation.hpp" +#include "utility/ScopedBuffer.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" namespace quickstep { -class ColumnVector; -class StorageManager; -class ValueAccessor; - /** \addtogroup Expressions * @{ */ /** - * @brief Aggregation state for sum. - */ -class AggregationStateSum : public AggregationState { - public: - /** - * @brief Copy constructor (ignores mutex). - */ - AggregationStateSum(const AggregationStateSum &orig) - : sum_(orig.sum_), - null_(orig.null_), - sum_offset_(orig.sum_offset_), - null_offset_(orig.null_offset_) {} - - std::size_t getPayloadSize() const { - std::size_t p1 = reinterpret_cast<std::size_t>(&sum_); - std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_); - return (p2 - p1); - } - - const std::uint8_t* getPayloadAddress() const { - return reinterpret_cast<const uint8_t *>(&sum_); - } - - private: - friend class AggregationHandleSum; - - AggregationStateSum() - : sum_(0), - null_(true), - sum_offset_(0), - null_offset_(reinterpret_cast<std::uint8_t *>(&null_) - - reinterpret_cast<std::uint8_t *>(&sum_)) {} - - AggregationStateSum(TypedValue &&sum, const bool is_null) - : sum_(std::move(sum)), null_(is_null) {} - - // TODO(shoban): We might want to specialize sum_ to use atomics for int types - // similar to in AggregationStateCount. - TypedValue sum_; - bool null_; - SpinMutex mutex_; - - int sum_offset_, null_offset_; -}; - -/** * @brief An aggregationhandle for sum. **/ -class AggregationHandleSum : public AggregationConcreteHandle { +class AggregationHandleSum : public AggregationHandle { public: ~AggregationHandleSum() override {} - AggregationState* createInitialState() const override { - return new AggregationStateSum(blank_state_); - } - - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - inline void iterateUnaryInl(AggregationStateSum *state, - const TypedValue &value) const { - DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); - if (value.isNull()) return; - - SpinMutexLock lock(state->mutex_); - state->sum_ = fast_operator_->applyToTypedValues(state->sum_, value); - state->null_ = false; - } - - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); - if (value.isNull()) return; - TypedValue *sum_ptr = - reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); - bool *null_ptr = - reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); - *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value); - *null_ptr = false; - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } - - void blockUpdate() override { block_update_ = true; } - - void allowUpdate() override { block_update_ = false; } - - void initPayload(std::uint8_t *byte_ptr) const override { - TypedValue *sum_ptr = - reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); - bool *null_ptr = - reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); - *sum_ptr = blank_state_.sum_; - *null_ptr = true; - } - - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; + void accumulateColumnVectors( + void *state, + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( + void accumulateValueAccessor( + void *state, ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_id) const override; + const std::vector<attribute_id> &accessor_ids) const override; #endif - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override; - - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast<const AggregationStateSum &>(state).sum_; - } - - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr); - TypedValue *sum_ptr = - reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_); - return *sum_ptr; - } - - ColumnVector* finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for SUM aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for SUM aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { - return blank_state_.getPayloadSize(); - } - private: friend class AggregateFunctionSum; @@ -226,15 +67,13 @@ class AggregationHandleSum : public AggregationConcreteHandle { * * @param type Type of the sum value. **/ - explicit AggregationHandleSum(const Type &type); + explicit AggregationHandleSum(const Type &argument_type); - const Type &argument_type_; - const Type *result_type_; - AggregationStateSum blank_state_; - std::unique_ptr<UncheckedBinaryOperator> fast_operator_; - std::unique_ptr<UncheckedBinaryOperator> merge_operator_; + // TODO: temporary + TypedValue tv_blank_state_; - bool block_update_; + std::unique_ptr<UncheckedBinaryOperator> accumulate_operator_; + std::unique_ptr<UncheckedBinaryOperator> merge_operator_; DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index e9503f7..7b369ae 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -43,9 +43,6 @@ add_library(quickstep_expressions_aggregation_AggregateFunctionMin add_library(quickstep_expressions_aggregation_AggregateFunctionSum AggregateFunctionSum.cpp AggregateFunctionSum.hpp) -add_library(quickstep_expressions_aggregation_AggregationConcreteHandle - AggregationConcreteHandle.cpp - AggregationConcreteHandle.hpp) add_library(quickstep_expressions_aggregation_AggregationHandle ../../empty_src.cpp AggregationHandle.hpp) @@ -55,9 +52,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg add_library(quickstep_expressions_aggregation_AggregationHandleCount AggregationHandleCount.cpp AggregationHandleCount.hpp) -add_library(quickstep_expressions_aggregation_AggregationHandleDistinct - AggregationHandleDistinct.cpp - AggregationHandleDistinct.hpp) add_library(quickstep_expressions_aggregation_AggregationHandleMax AggregationHandleMax.cpp AggregationHandleMax.hpp) @@ -142,34 +136,20 @@ target_link_libraries(quickstep_expressions_aggregation_AggregateFunctionSum quickstep_types_operations_binaryoperations_BinaryOperationFactory quickstep_types_operations_binaryoperations_BinaryOperationID quickstep_utility_Macros) -target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandle - glog - quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory - quickstep_threading_SpinMutex - quickstep_types_TypedValue - quickstep_types_containers_ColumnVector - quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandle glog quickstep_catalog_CatalogTypedefs - quickstep_storage_HashTableBase + quickstep_types_Type quickstep_types_TypedValue - quickstep_utility_Macros) + quickstep_utility_Macros + quickstep_utility_ScopedBuffer) target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory - quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory quickstep_types_TypeID @@ -181,34 +161,23 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorUtil + quickstep_types_LongType quickstep_types_TypeFactory quickstep_types_TypeID quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorUtil quickstep_utility_Macros) -target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct - glog - quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_types_TypedValue - quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -223,9 +192,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -240,21 +207,21 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum glog quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory + quickstep_types_TypeFunctors quickstep_types_TypeID quickstep_types_TypedValue quickstep_types_operations_binaryoperations_BinaryOperation quickstep_types_operations_binaryoperations_BinaryOperationFactory quickstep_types_operations_binaryoperations_BinaryOperationID - quickstep_utility_Macros) + quickstep_utility_Macros + quickstep_utility_ScopedBuffer) # Submodule all-in-one library: add_library(quickstep_expressions_aggregation ../../empty_src.cpp) @@ -267,11 +234,9 @@ target_link_libraries(quickstep_expressions_aggregation quickstep_expressions_aggregation_AggregateFunctionMax quickstep_expressions_aggregation_AggregateFunctionMin quickstep_expressions_aggregation_AggregateFunctionSum - quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationHandleAvg quickstep_expressions_aggregation_AggregationHandleCount - quickstep_expressions_aggregation_AggregationHandleDistinct quickstep_expressions_aggregation_AggregationHandleMax quickstep_expressions_aggregation_AggregationHandleMin quickstep_expressions_aggregation_AggregationHandleSum http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationHashTable.hpp b/storage/AggregationHashTable.hpp deleted file mode 100644 index fca6d4c..0000000 --- a/storage/AggregationHashTable.hpp +++ /dev/null @@ -1,330 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_ -#define QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_ - -#include <algorithm> -#include <atomic> -#include <cstddef> -#include <cstdlib> -#include <cstring> -#include <limits> -#include <memory> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/HashTableBase.hpp" -#include "storage/HashTableUntypedKeyManager.hpp" -#include "storage/StorageBlob.hpp" -#include "storage/StorageBlockInfo.hpp" -#include "storage/StorageConstants.hpp" -#include "storage/StorageManager.hpp" -#include "storage/ValueAccessor.hpp" -#include "storage/ValueAccessorUtil.hpp" -#include "threading/SpinMutex.hpp" -#include "threading/SpinSharedMutex.hpp" -#include "types/Type.hpp" -#include "types/TypeFunctors.hpp" -#include "utility/Alignment.hpp" -#include "utility/InlineMemcpy.hpp" -#include "utility/Macros.hpp" -#include "utility/PrimeNumber.hpp" - -namespace quickstep { - -/** \addtogroup Storage - * @{ - */ - -template <bool use_mutex> -class AggregationHashTablePayloadManager { - public: - AggregationHashTablePayloadManager(const std::vector<AggregationHandle *> &handles) - : handles_(handles), - payload_size_in_bytes_(0) { - if (use_mutex) { - payload_size_in_bytes_ += sizeof(SpinMutex); - } - for (const AggregationHandle *handle : handles) { - const std::size_t state_size = handle->getStateSize(); - agg_state_sizes_.emplace_back(state_size); - agg_state_offsets_.emplace_back(payload_size_in_bytes_); - payload_size_in_bytes_ += state_size; - } - - initial_payload_ = std::malloc(payload_size_in_bytes_); - if (use_mutex) { - new(initial_payload_) Mutex; - } -// for (std::size_t i = 0; i < handles_.size(); ++i) { -// handles_[i]->initPayload( -// static_cast<std::uint8_t *>(initial_payload_) + agg_state_offsets_[i]); -// } - } - - ~AggregationHashTablePayloadManager() { - std::free(initial_payload_); - } - - inline std::size_t getPayloadSizeInBytes() const { - return payload_size_in_bytes_; - } - - inline void updatePayload(void *payload) const { - } - - inline void initPayload(void *payload) const { - } - - private: - std::vector<AggregationHandle *> handles_; - - std::vector<std::size_t> agg_state_sizes_; - std::vector<std::size_t> agg_state_offsets_; - std::size_t payload_size_in_bytes_; - - void *initial_payload_; - - DISALLOW_COPY_AND_ASSIGN(AggregationHashTablePayloadManager); -}; - -class ThreadPrivateAggregationHashTable : public AggregationHashTableBase { - public: - ThreadPrivateAggregationHashTable(const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager) - : payload_manager_(handles), - key_types_(key_types), - key_manager_(this->key_types_, payload_manager_.getPayloadSizeInBytes()), - slots_(num_entries * kHashTableLoadFactor, - key_manager_.getUntypedKeyHashFunctor(), - key_manager_.getUntypedKeyEqualityFunctor()), - bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize(), - payload_manager_.getPayloadSizeInBytes())), - buckets_allocated_(0), - storage_manager_(storage_manager) { - std::size_t num_storage_slots = - this->storage_manager_->SlotsNeededForBytes(num_entries); - - // Get a StorageBlob to hold the hash table. - const block_id blob_id = this->storage_manager_->createBlob(num_storage_slots); - this->blob_ = this->storage_manager_->getBlobMutable(blob_id); - - buckets_ = this->blob_->getMemoryMutable(); - num_buckets_ = num_storage_slots * kSlotSizeBytes / bucket_size_; - } - - void resize() { - const std::size_t resized_memory_required = num_buckets_ * bucket_size_ * 2; - const std::size_t resized_storage_slots = - this->storage_manager_->SlotsNeededForBytes(resized_memory_required); - const block_id resized_blob_id = - this->storage_manager_->createBlob(resized_storage_slots); - MutableBlobReference resized_blob = - this->storage_manager_->getBlobMutable(resized_blob_id); - - void *resized_buckets = resized_blob->getMemoryMutable(); - std::memcpy(resized_buckets, buckets_, buckets_allocated_ * bucket_size_); - - for (auto &pair : slots_) { - pair.second = - (static_cast<const char *>(pair.first) - static_cast<char *>(buckets_)) - + static_cast<char *>(resized_buckets); - } - - buckets_ = resized_buckets; - num_buckets_ = resized_storage_slots * kSlotSizeBytes / bucket_size_; - std::swap(this->blob_, resized_blob); - } - - bool upsertValueAccessor(ValueAccessor *accessor, - const attribute_id key_attr_id, - const std::vector<attribute_id> &argument_ids, - const bool check_for_null_keys) override { - if (check_for_null_keys) { - return upsertValueAccessorInternal<true>( - accessor, key_attr_id, argument_ids); - } else { - return upsertValueAccessorInternal<false>( - accessor, key_attr_id, argument_ids); - } - } - - template <bool check_for_null_keys> - bool upsertValueAccessorInternal(ValueAccessor *accessor, - const attribute_id key_attr_id, - const std::vector<attribute_id> &argument_ids) { - return InvokeOnAnyValueAccessor( - accessor, - [&](auto *accessor) -> bool { // NOLINT(build/c++11) - accessor->beginIteration(); - while (accessor->next()) { - const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id); - if (check_for_null_keys && key == nullptr) { - continue; - } - bool is_empty; - void *bucket = locateBucket(key, &is_empty); - if (is_empty) { - payload_manager_.initPayload(bucket); - } else { - payload_manager_.updatePayload(bucket); - } - } - return true; - }); - } - - bool upsertValueAccessorCompositeKey(ValueAccessor *accessor, - const std::vector<attribute_id> &key_attr_ids, - const std::vector<attribute_id> &argument_ids, - const bool check_for_null_keys) override { - if (check_for_null_keys) { - return upsertValueAccessorCompositeKeyInternal<true>( - accessor, key_attr_ids, argument_ids); - } else { - return upsertValueAccessorCompositeKeyInternal<false>( - accessor, key_attr_ids, argument_ids); - } - } - - template <bool check_for_null_keys> - bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor, - const std::vector<attribute_id> &key_attr_ids, - const std::vector<attribute_id> &argument_ids) { - return InvokeOnAnyValueAccessor( - accessor, - [&](auto *accessor) -> bool { // NOLINT(build/c++11) - accessor->beginIteration(); - void *prealloc_bucket = allocateBucket(); - while (accessor->next()) { - if (check_for_null_keys) { - const bool is_null = - key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket( - accessor, - key_attr_ids, - prealloc_bucket); - if (is_null) { - continue; - } - } else { - key_manager_.writeUntypedKeyFromValueAccessorToBucket( - accessor, - key_attr_ids, - prealloc_bucket); - } - void *bucket = locateBucketWithPrealloc(prealloc_bucket); - if (bucket != prealloc_bucket) { - payload_manager_.initPayload(bucket); - prealloc_bucket = allocateBucket(); - } else { - payload_manager_.updatePayload(bucket); - } - } - // Reclaim the last unused bucket - --buckets_allocated_; - return true; - }); - } - - inline void* locateBucket(const void *key, bool *is_empty) { - auto slot_it = slots_.find(key); - if (slot_it == slots_.end()) { - void *bucket = allocateBucket(); - key_manager_.writeUntypedKeyToBucket(key, bucket); - slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket); - *is_empty = true; - return bucket; - } else { - *is_empty = false; - return slot_it->second; - } - } - - inline void* locateBucketWithPrealloc(void *prealloc_bucket) { - const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket); - auto slot_it = slots_.find(key); - if (slot_it == slots_.end()) { - slots_.emplace(key, prealloc_bucket); - return prealloc_bucket; - } else { - return slot_it->second; - } - } - - inline void* allocateBucket() { - if (buckets_allocated_ >= num_buckets_) { - resize(); - } - void *bucket = static_cast<char *>(buckets_) + buckets_allocated_ * bucket_size_; - ++buckets_allocated_; - return bucket; - } - - void print() const override { - std::cerr << "Bucket size = " << bucket_size_ << "\n"; - std::cerr << "Buckets: \n"; - for (const auto &pair : slots_) { - std::cerr << pair.first << " -- " << pair.second << "\n"; - std::cerr << *static_cast<const int *>(pair.second) << "\n"; - } - } - - private: - // Helper object to manage hash table payloads (i.e. aggregation states). - AggregationHashTablePayloadManager<false> payload_manager_; - - // Type(s) of keys. - const std::vector<const Type*> key_types_; - - // Helper object to manage key storage. - HashTableUntypedKeyManager key_manager_; - - // Round bucket size up to a multiple of kBucketAlignment. - static std::size_t ComputeBucketSize(const std::size_t fixed_key_size, - const std::size_t total_payload_size) { - constexpr std::size_t kBucketAlignment = 4; - return (((fixed_key_size + total_payload_size - 1) - / kBucketAlignment) + 1) * kBucketAlignment; - } - - std::unordered_map<const void *, void *, - UntypedKeyHashFunctor, - UntypedKeyEqualityFunctor> slots_; - - void *buckets_; - const std::size_t bucket_size_; - std::size_t num_buckets_; - std::size_t buckets_allocated_; - - StorageManager *storage_manager_; - MutableBlobReference blob_; - - DISALLOW_COPY_AND_ASSIGN(ThreadPrivateAggregationHashTable); -}; - - -} // namespace quickstep - -#endif // QUICKSTEP_STORAGE_AGGREGATION_HASH_TABLE_HPP_ - http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ccd5a31/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index fe16fc4..50e7c06 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -34,13 +34,11 @@ #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunctionFactory.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationHandleDistinct.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" -#include "storage/AggregationHashTable.hpp" #include "storage/AggregationOperationState.pb.h" -#include "storage/HashTable.hpp" +#include "storage/AggregationStateHashTable.hpp" #include "storage/HashTableBase.hpp" #include "storage/HashTableFactory.hpp" #include "storage/InsertDestination.hpp" @@ -88,122 +86,66 @@ AggregationOperationState::AggregationOperationState( std::vector<AggregationHandle *> group_by_handles; group_by_handles.clear(); - if (aggregate_functions.size() == 0) { - // If there is no aggregation function, then it is a distinctify operation - // on the group-by expressions. - DCHECK_GT(group_by_list_.size(), 0u); - - handles_.emplace_back(new AggregationHandleDistinct()); - arguments_.push_back({}); - is_distinct_.emplace_back(false); - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - {1}, - handles_, - storage_manager)); - } else { - // Set up each individual aggregate in this operation. - std::vector<const AggregateFunction *>::const_iterator agg_func_it = - aggregate_functions.begin(); - std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator - args_it = arguments_.begin(); - std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); - std::vector<HashTableImplType>::const_iterator - distinctify_hash_table_impl_types_it = - distinctify_hash_table_impl_types.begin(); - std::vector<std::size_t> payload_sizes; - for (; agg_func_it != aggregate_functions.end(); - ++agg_func_it, ++args_it, ++is_distinct_it) { - // Get the Types of this aggregate's arguments so that we can create an - // AggregationHandle. - std::vector<const Type *> argument_types; - for (const std::unique_ptr<const Scalar> &argument : *args_it) { - argument_types.emplace_back(&argument->getType()); - } - - // Sanity checks: aggregate function exists and can apply to the specified - // arguments. - DCHECK(*agg_func_it != nullptr); - DCHECK((*agg_func_it)->canApplyToTypes(argument_types)); - - // Have the AggregateFunction create an AggregationHandle that we can use - // to do actual aggregate computation. - handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); + // Set up each individual aggregate in this operation. + for (std::size_t i = 0; i < aggregate_functions.size(); ++i) { + // Get the Types of this aggregate's arguments so that we can create an + // AggregationHandle. + std::vector<const Type *> argument_types; + for (const std::unique_ptr<const Scalar> &argument : arguments[i]) { + argument_types.emplace_back(&argument->getType()); + } - if (!group_by_list_.empty()) { - // Aggregation with GROUP BY: combined payload is partially updated in - // the presence of DISTINCT. - if (*is_distinct_it) { - handles_.back()->blockUpdate(); - } - group_by_handles.emplace_back(handles_.back()); - payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); - } else { - // Aggregation without GROUP BY: create a single global state. - single_states_.emplace_back(handles_.back()->createInitialState()); + // Sanity checks: aggregate function exists and can apply to the specified + // arguments. + const AggregateFunction *agg_func = aggregate_functions[i]; + DCHECK(agg_func != nullptr); + DCHECK(agg_func->canApplyToTypes(argument_types)); + + // Have the AggregateFunction create an AggregationHandle that we can use + // to do actual aggregate computation. + handles_.emplace_back(agg_func->createHandle(argument_types)); + + if (!group_by_list_.empty()) { + // TODO(jianqiao): handle DISTINCT aggregation. + // if (is_distinct[i]) { + // } + group_by_handles.emplace_back(handles_.back()); + } else { + // Aggregation without GROUP BY: create a single global state. + single_states_.emplace_back(handles_.back()->createInitialState()); #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // See if all of this aggregate's arguments are attributes in the input - // relation. If so, remember the attribute IDs so that we can do copy - // elision when actually performing the aggregation. - std::vector<attribute_id> local_arguments_as_attributes; - local_arguments_as_attributes.reserve(args_it->size()); - for (const std::unique_ptr<const Scalar> &argument : *args_it) { - const attribute_id argument_id = - argument->getAttributeIdForValueAccessor(); - if (argument_id == -1) { - local_arguments_as_attributes.clear(); - break; - } else { - DCHECK_EQ(input_relation_.getID(), - argument->getRelationIdForValueAccessor()); - local_arguments_as_attributes.push_back(argument_id); - } + // See if all of this aggregate's arguments are attributes in the input + // relation. If so, remember the attribute IDs so that we can do copy + // elision when actually performing the aggregation. + std::vector<attribute_id> local_arguments_as_attributes; + local_arguments_as_attributes.reserve(arguments[i].size()); + for (const std::unique_ptr<const Scalar> &argument : arguments[i]) { + const attribute_id argument_id = + argument->getAttributeIdForValueAccessor(); + if (argument_id == -1) { + local_arguments_as_attributes.clear(); + break; + } else { + DCHECK_EQ(input_relation_.getID(), + argument->getRelationIdForValueAccessor()); + local_arguments_as_attributes.push_back(argument_id); } - - arguments_as_attributes_.emplace_back( - std::move(local_arguments_as_attributes)); -#endif } - // Initialize the corresponding distinctify hash table if this is a - // DISTINCT - // aggregation. - if (*is_distinct_it) { - std::vector<const Type *> key_types(group_by_types); - key_types.insert( - key_types.end(), argument_types.begin(), argument_types.end()); - // TODO(jianqiao): estimated_num_entries is quite inaccurate for - // estimating - // the number of entries in the distinctify hash table. We may estimate - // for each distinct aggregation an estimated_num_distinct_keys value - // during - // query optimization, if it worths. - distinctify_hashtables_.emplace_back( - AggregationStateFastHashTableFactory::CreateResizable( - *distinctify_hash_table_impl_types_it, - key_types, - estimated_num_entries, - {0}, - {}, - storage_manager)); - ++distinctify_hash_table_impl_types_it; - } else { - distinctify_hashtables_.emplace_back(nullptr); - } + arguments_as_attributes_.emplace_back( + std::move(local_arguments_as_attributes)); +#endif } + } - if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group - // states. - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager)); - } + if (!group_by_handles.empty()) { + // Aggregation with GROUP BY: create a HashTable pool for per-group states. + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + group_by_handles, + storage_manager)); } } @@ -352,12 +294,12 @@ void AggregationOperationState::finalizeAggregate( } void AggregationOperationState::mergeSingleState( - const std::vector<std::unique_ptr<AggregationState>> &local_state) { + const std::vector<ScopedBuffer> &local_state) { DEBUG_ASSERT(local_state.size() == single_states_.size()); for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (!is_distinct_[agg_idx]) { - handles_[agg_idx]->mergeStates(*local_state[agg_idx], - single_states_[agg_idx].get()); + handles_[agg_idx]->mergeStates(single_states_[agg_idx].get(), + local_state[agg_idx].get()); } } } @@ -365,7 +307,7 @@ void AggregationOperationState::mergeSingleState( void AggregationOperationState::aggregateBlockSingleState( const block_id input_block) { // Aggregate per-block state for each aggregate. - std::vector<std::unique_ptr<AggregationState>> local_state; + std::vector<ScopedBuffer> local_state; BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); @@ -386,14 +328,7 @@ void AggregationOperationState::aggregateBlockSingleState( // Call StorageBlock::aggregateDistinct() to put the arguments as keys // directly into the (threadsafe) shared global distinctify HashTable // for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - {}, /* group_by */ - predicate_.get(), - distinctify_hashtables_[agg_idx].get(), - &reuse_matches, - nullptr /* reuse_group_by_vectors */); + // TODO(jianqiao): handle DISTINCT aggregation. local_state.emplace_back(nullptr); } else { // Call StorageBlock::aggregate() to actually do the aggregation. @@ -426,18 +361,10 @@ void AggregationOperationState::aggregateBlockHashTable( for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY - // expression + // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression // values and the aggregation arguments together as keys directly into the // (threadsafe) shared global distinctify HashTable for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - nullptr, /* arguments_as_attributes */ - group_by_list_, - predicate_.get(), - distinctify_hashtables_[agg_idx].get(), - &reuse_matches, - &reuse_group_by_vectors); + // TODO(jianqiao): handle DISTINCT aggregation. } } @@ -445,16 +372,13 @@ void AggregationOperationState::aggregateBlockHashTable( // directly into the (threadsafe) shared global HashTable for this // aggregate. DCHECK(group_by_hashtable_pool_ != nullptr); - AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pool_->getHashTableFast(); + auto *agg_hash_table = group_by_hashtable_pool_->getHashTable(); DCHECK(agg_hash_table != nullptr); block->aggregateGroupBy(arguments_, group_by_list_, predicate_.get(), agg_hash_table, - group_by_hashtable_pool_->createNewThreadPrivateHashTable(), -// nullptr, &reuse_matches, &reuse_group_by_vectors); group_by_hashtable_pool_->returnHashTable(agg_hash_table); @@ -468,23 +392,23 @@ void AggregationOperationState::finalizeSingleState( for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - single_states_[agg_idx].reset( - handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle( - *distinctify_hashtables_[agg_idx])); + // TODO(jianqiao): handle DISTINCT aggregation } attribute_values.emplace_back( - handles_[agg_idx]->finalize(*single_states_[agg_idx])); + handles_[agg_idx]->finalize(single_states_[agg_idx].get())); } output_destination->insertTuple(Tuple(std::move(attribute_values))); } void AggregationOperationState::mergeGroupByHashTables( - AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) { - HashTableMergerFast merger(dst); - (static_cast<FastHashTable<true, false, true, false> *>(src)) - ->forEachCompositeKeyFast(&merger); + AggregationStateHashTableBase *destination_hash_table, + const AggregationStateHashTableBase *source_hash_table) { + static_cast<ThreadPrivateAggregationStateHashTable *>( + destination_hash_table)->mergeHashTable( + static_cast<const ThreadPrivateAggregationStateHashTable *>( + source_hash_table)); } void AggregationOperationState::finalizeHashTable( @@ -501,103 +425,22 @@ void AggregationOperationState::finalizeHashTable( // e.g. Keep merging entries from smaller hash tables to larger. auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - if (hash_tables->size() > 1) { - for (int hash_table_index = 0; - hash_table_index < static_cast<int>(hash_tables->size() - 1); - ++hash_table_index) { - // Merge each hash table to the last hash table. - mergeGroupByHashTables((*hash_tables)[hash_table_index].get(), - hash_tables->back().get()); - } - } - - // Collect per-aggregate finalized values. - std::vector<std::unique_ptr<ColumnVector>> final_values; - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pool_ != nullptr); - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - DCHECK(hash_tables->back() != nullptr); - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); - handles_[agg_idx]->allowUpdate(); - handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( - *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); - } - - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); - ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *agg_hash_table, &group_by_keys, agg_idx); - if (agg_result_col != nullptr) { - final_values.emplace_back(agg_result_col); - } - } - - // Reorganize 'group_by_keys' in column-major order so that we can make a - // ColumnVectorsValueAccessor to bulk-insert results. - // - // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal - // if there is only one aggregate. The need to do this should hopefully go - // away when we work out storing composite structures for multiple aggregates - // in a single HashTable. - std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; - std::size_t group_by_element_idx = 0; - for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { - const Type &group_by_type = group_by_element->getType(); - if (NativeColumnVector::UsableForType(group_by_type)) { - NativeColumnVector *element_cv = - new NativeColumnVector(group_by_type, group_by_keys.size()); - group_by_cvs.emplace_back(element_cv); - for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); - } - } else { - IndirectColumnVector *element_cv = - new IndirectColumnVector(group_by_type, group_by_keys.size()); - group_by_cvs.emplace_back(element_cv); - for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); - } - } - ++group_by_element_idx; + if (hash_tables->size() == 0) { + return; } - // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys - // and the finalized aggregates. - ColumnVectorsValueAccessor complete_result; - for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) { - complete_result.addColumn(group_by_cv.release()); - } - for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) { - complete_result.addColumn(final_value_cv.release()); + std::unique_ptr<AggregationStateHashTableBase> final_hash_table( + hash_tables->back().release()); + for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { + std::unique_ptr<AggregationStateHashTableBase> hash_table( + hash_tables->at(i).release()); + mergeGroupByHashTables(final_hash_table.get(), hash_table.get()); } // Bulk-insert the complete result. - output_destination->bulkInsertTuples(&complete_result); + std::unique_ptr<AggregationResultIterator> results( + final_hash_table->createResultIterator()); + output_destination->bulkInsertAggregationResults(results.get()); } } // namespace quickstep