http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index 7e38473..5fb9f44 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -55,25 +56,24 @@ class AggregationStateMax : public AggregationState { /** * @brief Copy constructor (ignores mutex). */ - AggregationStateMax(const AggregationStateMax &orig) - : max_(orig.max_) { - } + AggregationStateMax(const AggregationStateMax &orig) : max_(orig.max_) {} /** * @brief Destructor. */ - ~AggregationStateMax() override {}; + ~AggregationStateMax() override{}; + + const std::uint8_t* getPayloadAddress() const { + return reinterpret_cast<const uint8_t *>(&max_); + } private: friend class AggregationHandleMax; explicit AggregationStateMax(const Type &type) - : max_(type.getNullableVersion().makeNullValue()) { - } + : max_(type.getNullableVersion().makeNullValue()) {} - explicit AggregationStateMax(TypedValue &&value) - : max_(std::move(value)) { - } + explicit AggregationStateMax(TypedValue &&value) : max_(std::move(value)) {} TypedValue max_; SpinMutex mutex_; @@ -84,8 +84,7 @@ class AggregationStateMax : public AggregationState { **/ class AggregationHandleMax : public AggregationConcreteHandle { public: - ~AggregationHandleMax() override { - } + ~AggregationHandleMax() override {} AggregationState* createInitialState() const override { return new AggregationStateMax(type_); @@ -93,20 +92,46 @@ class AggregationHandleMax : public AggregationConcreteHandle { AggregationStateHashTableBase* createGroupByHashTable( const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, + const std::vector<const Type *> &group_by_types, const std::size_t estimated_num_groups, StorageManager *storage_manager) const override; /** * @brief Iterate with max aggregation state. */ - inline void iterateUnaryInl(AggregationStateMax *state, const TypedValue &value) const { + inline void iterateUnaryInl(AggregationStateMax *state, + const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); - compareAndUpdate(static_cast<AggregationStateMax*>(state), value); + compareAndUpdate(static_cast<AggregationStateMax *>(state), value); + } + + inline void iterateUnaryInlFast(const TypedValue &value, + std::uint8_t *byte_ptr) const { + DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + compareAndUpdateFast(max_ptr, value); + } + + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInlFast(argument, byte_ptr); + } + } + + void blockUpdate() override { block_update_ = true; } + + void allowUpdate() override { block_update_ = false; } + + void initPayload(std::uint8_t *byte_ptr) const override { + TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + TypedValue t1 = (type_.getNullableVersion().makeNullValue()); + *max_ptr = t1; } AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) + const override; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION AggregationState* accumulateValueAccessor( @@ -123,37 +148,49 @@ class AggregationHandleMax : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override { - return TypedValue(static_cast<const AggregationStateMax&>(state).max_); + return TypedValue(static_cast<const AggregationStateMax &>(state).max_); + } + + inline TypedValue finalizeHashTableEntry( + const AggregationState &state) const { + return TypedValue(static_cast<const AggregationStateMax &>(state).max_); } - inline TypedValue finalizeHashTableEntry(const AggregationState &state) const { - return TypedValue(static_cast<const AggregationStateMax&>(state).max_); + inline TypedValue finalizeHashTableEntryFast( + const std::uint8_t *byte_ptr) const { + const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); + return TypedValue(*max_ptr); } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() * for MAX aggregation. */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const override; - + const AggregationStateHashTableBase &distinctify_hash_table) + const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() * for MAX aggregation. */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, + std::size_t index) const override; - void mergeGroupByHashTables( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const override; + std::size_t getPayloadSize() const override { return sizeof(TypedValue); } private: friend class AggregateFunctionMax; @@ -166,24 +203,37 @@ class AggregationHandleMax : public AggregationConcreteHandle { explicit AggregationHandleMax(const Type &type); /** - * @brief compare the value with max_ and update it if the value is larger than - * current maximum. NULLs are ignored. + * @brief compare the value with max_ and update it if the value is larger + * than current maximum. NULLs are ignored. * * @param value A TypedValue to compare **/ - inline void compareAndUpdate(AggregationStateMax *state, const TypedValue &value) const { + inline void compareAndUpdate(AggregationStateMax *state, + const TypedValue &value) const { // TODO(chasseur): Avoid null-checks when aggregating a non-nullable Type. if (value.isNull()) return; SpinMutexLock lock(state->mutex_); - if (state->max_.isNull() || fast_comparator_->compareTypedValues(value, state->max_)) { + if (state->max_.isNull() || + fast_comparator_->compareTypedValues(value, state->max_)) { state->max_ = value; } } + inline void compareAndUpdateFast(TypedValue *max_ptr, + const TypedValue &value) const { + if (value.isNull()) return; + if (max_ptr->isNull() || + fast_comparator_->compareTypedValues(value, *max_ptr)) { + *max_ptr = value; + } + } + const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; + bool block_update_; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax); };
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index e860d8d..a07f299 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -39,22 +39,19 @@ namespace quickstep { class StorageManager; AggregationHandleMin::AggregationHandleMin(const Type &type) - : type_(type) { - fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess) - .makeUncheckedComparatorForTypes(type, - type.getNonNullableVersion())); + : type_(type), block_update_(false) { + fast_comparator_.reset( + ComparisonFactory::GetComparison(ComparisonID::kLess) + .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); } AggregationStateHashTableBase* AggregationHandleMin::createGroupByHashTable( const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, + const std::vector<const Type *> &group_by_types, const std::size_t estimated_num_groups, StorageManager *storage_manager) const { return AggregationStateHashTableFactory<AggregationStateMin>::CreateResizable( - hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); + hash_table_impl, group_by_types, estimated_num_groups, storage_manager); } AggregationState* AggregationHandleMin::accumulateColumnVectors( @@ -62,9 +59,8 @@ AggregationState* AggregationHandleMin::accumulateColumnVectors( DCHECK_EQ(1u, column_vectors.size()) << "Got wrong number of ColumnVectors for MIN: " << column_vectors.size(); - return new AggregationStateMin( - fast_comparator_->accumulateColumnVector(type_.getNullableVersion().makeNullValue(), - *column_vectors.front())); + return new AggregationStateMin(fast_comparator_->accumulateColumnVector( + type_.getNullableVersion().makeNullValue(), *column_vectors.front())); } #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -74,10 +70,10 @@ AggregationState* AggregationHandleMin::accumulateValueAccessor( DCHECK_EQ(1u, accessor_ids.size()) << "Got wrong number of attributes for MIN: " << accessor_ids.size(); - return new AggregationStateMin( - fast_comparator_->accumulateValueAccessor(type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); + return new AggregationStateMin(fast_comparator_->accumulateValueAccessor( + type_.getNullableVersion().makeNullValue(), + accessor, + accessor_ids.front())); } #endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -88,66 +84,55 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable( AggregationStateHashTableBase *hash_table) const { DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for MIN: " << argument_ids.size(); - - aggregateValueAccessorIntoHashTableUnaryHelper< - AggregationHandleMin, - AggregationStateMin, - AggregationStateHashTable<AggregationStateMin>>( - accessor, - argument_ids.front(), - group_by_key_ids, - AggregationStateMin(type_), - hash_table); } -void AggregationHandleMin::mergeStates( - const AggregationState &source, - AggregationState *destination) const { - const AggregationStateMin &min_source = static_cast<const AggregationStateMin&>(source); - AggregationStateMin *min_destination = static_cast<AggregationStateMin*>(destination); +void AggregationHandleMin::mergeStates(const AggregationState &source, + AggregationState *destination) const { + const AggregationStateMin &min_source = + static_cast<const AggregationStateMin &>(source); + AggregationStateMin *min_destination = + static_cast<AggregationStateMin *>(destination); if (!min_source.min_.isNull()) { compareAndUpdate(min_destination, min_source.min_); } } +void AggregationHandleMin::mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const { + const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source); + TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination); + + if (!(src_min_ptr->isNull())) { + compareAndUpdateFast(dst_min_ptr, *src_min_ptr); + } +} + ColumnVector* AggregationHandleMin::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleMin, - AggregationStateHashTable<AggregationStateMin>>( - type_.getNonNullableVersion(), - hash_table, - group_by_keys); + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleMin, + AggregationStateFastHashTable>( + type_.getNonNullableVersion(), hash_table, group_by_keys, index); } -AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( +AggregationState* +AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleMin, - AggregationStateMin>( - distinctify_hash_table); + AggregationStateMin>(distinctify_hash_table); } void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, + std::size_t index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleMin, - AggregationStateMin, - AggregationStateHashTable<AggregationStateMin>>( - distinctify_hash_table, - AggregationStateMin(type_), - aggregation_hash_table); -} - -void AggregationHandleMin::mergeGroupByHashTables( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleMin, - AggregationStateMin, - AggregationStateHashTable<AggregationStateMin>>( - source_hash_table, destination_hash_table); + AggregationStateFastHashTable>( + distinctify_hash_table, aggregation_hash_table, index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 924698c..173911d 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -55,24 +56,26 @@ class AggregationStateMin : public AggregationState { /** * @brief Copy constructor (ignores mutex). */ - AggregationStateMin(const AggregationStateMin &orig) - : min_(orig.min_) { - } + AggregationStateMin(const AggregationStateMin &orig) : min_(orig.min_) {} /** * @brief Destructor. */ ~AggregationStateMin() override {} + std::size_t getPayloadSize() const { return sizeof(TypedValue); } + + const std::uint8_t *getPayloadAddress() const { + return reinterpret_cast<const uint8_t *>(&min_); + } + private: friend class AggregationHandleMin; explicit AggregationStateMin(const Type &type) : min_(type.getNullableVersion().makeNullValue()) {} - explicit AggregationStateMin(TypedValue &&value) - : min_(std::move(value)) { - } + explicit AggregationStateMin(TypedValue &&value) : min_(std::move(value)) {} TypedValue min_; SpinMutex mutex_; @@ -83,8 +86,7 @@ class AggregationStateMin : public AggregationState { **/ class AggregationHandleMin : public AggregationConcreteHandle { public: - ~AggregationHandleMin() override { - } + ~AggregationHandleMin() override {} AggregationState* createInitialState() const override { return new AggregationStateMin(type_); @@ -92,20 +94,46 @@ class AggregationHandleMin : public AggregationConcreteHandle { AggregationStateHashTableBase* createGroupByHashTable( const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, + const std::vector<const Type *> &group_by_types, const std::size_t estimated_num_groups, StorageManager *storage_manager) const override; /** * @brief Iterate with min aggregation state. */ - inline void iterateUnaryInl(AggregationStateMin *state, const TypedValue &value) const { + inline void iterateUnaryInl(AggregationStateMin *state, + const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); compareAndUpdate(state, value); } + inline void iterateUnaryInlFast(const TypedValue &value, + std::uint8_t *byte_ptr) const { + DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + compareAndUpdateFast(min_ptr, value); + } + + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInlFast(argument, byte_ptr); + } + } + + void blockUpdate() override { block_update_ = true; } + + void allowUpdate() override { block_update_ = false; } + + void initPayload(std::uint8_t *byte_ptr) const override { + TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + TypedValue t1 = (type_.getNullableVersion().makeNullValue()); + *min_ptr = t1; + } + AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) + const override; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION AggregationState* accumulateValueAccessor( @@ -122,36 +150,49 @@ class AggregationHandleMin : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override { - return static_cast<const AggregationStateMin&>(state).min_; + return static_cast<const AggregationStateMin &>(state).min_; + } + + inline TypedValue finalizeHashTableEntry( + const AggregationState &state) const { + return static_cast<const AggregationStateMin &>(state).min_; } - inline TypedValue finalizeHashTableEntry(const AggregationState &state) const { - return static_cast<const AggregationStateMin&>(state).min_; + inline TypedValue finalizeHashTableEntryFast( + const std::uint8_t *byte_ptr) const { + const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); + return TypedValue(*min_ptr); } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() * for MIN aggregation. */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const override; + const AggregationStateHashTableBase &distinctify_hash_table) + const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() * for MIN aggregation. */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, + std::size_t index) const override; - void mergeGroupByHashTables( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const override; + std::size_t getPayloadSize() const override { return sizeof(TypedValue); } private: friend class AggregateFunctionMin; @@ -164,23 +205,36 @@ class AggregationHandleMin : public AggregationConcreteHandle { explicit AggregationHandleMin(const Type &type); /** - * @brief compare the value with min_ and update it if the value is smaller than - * current minimum. NULLs are ignored. + * @brief compare the value with min_ and update it if the value is smaller + * than current minimum. NULLs are ignored. * * @param value A TypedValue to compare. **/ - inline void compareAndUpdate(AggregationStateMin *state, const TypedValue &value) const { + inline void compareAndUpdate(AggregationStateMin *state, + const TypedValue &value) const { if (value.isNull()) return; SpinMutexLock lock(state->mutex_); - if (state->min_.isNull() || fast_comparator_->compareTypedValues(value, state->min_)) { + if (state->min_.isNull() || + fast_comparator_->compareTypedValues(value, state->min_)) { state->min_ = value; } } + inline void compareAndUpdateFast(TypedValue *min_ptr, + const TypedValue &value) const { + if (value.isNull()) return; + if (min_ptr->isNull() || + fast_comparator_->compareTypedValues(value, *min_ptr)) { + *min_ptr = value; + } + } + const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; + bool block_update_; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index b5036a8..642d88d 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -43,7 +43,7 @@ namespace quickstep { class StorageManager; AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type) { + : argument_type_(type), block_update_(false) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -66,11 +66,13 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) // Make operators to do arithmetic: // Add operator for summing argument values. - fast_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) - .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_)); + fast_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) + .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_)); // Add operator for merging states. - merge_operator_.reset(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) - .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type)); + merge_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) + .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type)); // Result is nullable, because SUM() over 0 values (or all NULL values) is // NULL. @@ -79,26 +81,20 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable( const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, + const std::vector<const Type *> &group_by_types, const std::size_t estimated_num_groups, StorageManager *storage_manager) const { return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable( - hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); + hash_table_impl, group_by_types, estimated_num_groups, storage_manager); } AggregationState* AggregationHandleSum::accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { DCHECK_EQ(1u, column_vectors.size()) << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); - std::size_t num_tuples = 0; TypedValue cv_sum = fast_operator_->accumulateColumnVector( - blank_state_.sum_, - *column_vectors.front(), - &num_tuples); + blank_state_.sum_, *column_vectors.front(), &num_tuples); return new AggregationStateSum(std::move(cv_sum), num_tuples == 0); } @@ -111,10 +107,7 @@ AggregationState* AggregationHandleSum::accumulateValueAccessor( std::size_t num_tuples = 0; TypedValue va_sum = fast_operator_->accumulateValueAccessor( - blank_state_.sum_, - accessor, - accessor_ids.front(), - &num_tuples); + blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples); return new AggregationStateSum(std::move(va_sum), num_tuples == 0); } #endif @@ -126,32 +119,39 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable( AggregationStateHashTableBase *hash_table) const { DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for SUM: " << argument_ids.size(); - - aggregateValueAccessorIntoHashTableUnaryHelper< - AggregationHandleSum, - AggregationStateSum, - AggregationStateHashTable<AggregationStateSum>>( - accessor, - argument_ids.front(), - group_by_key_ids, - blank_state_, - hash_table); } -void AggregationHandleSum::mergeStates( - const AggregationState &source, - AggregationState *destination) const { - const AggregationStateSum &sum_source = static_cast<const AggregationStateSum&>(source); - AggregationStateSum *sum_destination = static_cast<AggregationStateSum*>(destination); +void AggregationHandleSum::mergeStates(const AggregationState &source, + AggregationState *destination) const { + const AggregationStateSum &sum_source = + static_cast<const AggregationStateSum &>(source); + AggregationStateSum *sum_destination = + static_cast<AggregationStateSum *>(destination); SpinMutexLock lock(sum_destination->mutex_); - sum_destination->sum_ = merge_operator_->applyToTypedValues(sum_destination->sum_, - sum_source.sum_); + sum_destination->sum_ = merge_operator_->applyToTypedValues( + sum_destination->sum_, sum_source.sum_); sum_destination->null_ = sum_destination->null_ && sum_source.null_; } +void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const { + const TypedValue *src_sum_ptr = + reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_); + const bool *src_null_ptr = + reinterpret_cast<const bool *>(source + blank_state_.null_offset_); + TypedValue *dst_sum_ptr = + reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_); + bool *dst_null_ptr = + reinterpret_cast<bool *>(destination + blank_state_.null_offset_); + *dst_sum_ptr = + merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr); + *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr); +} + TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { - const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state); + const AggregationStateSum &agg_state = + static_cast<const AggregationStateSum &>(state); if (agg_state.null_) { // SUM() over no values is NULL. return result_type_->makeNullValue(); @@ -162,41 +162,29 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { ColumnVector* AggregationHandleSum::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleSum, - AggregationStateHashTable<AggregationStateSum>>( - *result_type_, - hash_table, - group_by_keys); + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleSum, + AggregationStateFastHashTable>( + *result_type_, hash_table, group_by_keys, index); } -AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( +AggregationState* +AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleSum, - AggregationStateSum>( - distinctify_hash_table); + AggregationStateSum>(distinctify_hash_table); } void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, + std::size_t index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleSum, - AggregationStateSum, - AggregationStateHashTable<AggregationStateSum>>( - distinctify_hash_table, - blank_state_, - aggregation_hash_table); -} - -void AggregationHandleSum::mergeGroupByHashTables( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleSum, - AggregationStateSum, - AggregationStateHashTable<AggregationStateSum>>( - source_hash_table, destination_hash_table); + AggregationStateFastHashTable>( + distinctify_hash_table, aggregation_hash_table, index); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index 3382646..6c334a6 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -57,25 +58,40 @@ class AggregationStateSum : public AggregationState { */ AggregationStateSum(const AggregationStateSum &orig) : sum_(orig.sum_), - null_(orig.null_) { + null_(orig.null_), + sum_offset_(orig.sum_offset_), + null_offset_(orig.null_offset_) {} + + std::size_t getPayloadSize() const { + std::size_t p1 = reinterpret_cast<std::size_t>(&sum_); + std::size_t p2 = reinterpret_cast<std::size_t>(&mutex_); + return (p2 - p1); + } + + const std::uint8_t* getPayloadAddress() const { + return reinterpret_cast<const uint8_t *>(&sum_); } private: friend class AggregationHandleSum; AggregationStateSum() - : sum_(0), null_(true) { - } + : sum_(0), + null_(true), + sum_offset_(0), + null_offset_(reinterpret_cast<std::uint8_t *>(&null_) - + reinterpret_cast<std::uint8_t *>(&sum_)) {} AggregationStateSum(TypedValue &&sum, const bool is_null) - : sum_(std::move(sum)), null_(is_null) { - } + : sum_(std::move(sum)), null_(is_null) {} // TODO(shoban): We might want to specialize sum_ to use atomics for int types // similar to in AggregationStateCount. TypedValue sum_; bool null_; SpinMutex mutex_; + + int sum_offset_, null_offset_; }; /** @@ -83,8 +99,7 @@ class AggregationStateSum : public AggregationState { **/ class AggregationHandleSum : public AggregationConcreteHandle { public: - ~AggregationHandleSum() override { - } + ~AggregationHandleSum() override {} AggregationState* createInitialState() const override { return new AggregationStateSum(blank_state_); @@ -92,11 +107,12 @@ class AggregationHandleSum : public AggregationConcreteHandle { AggregationStateHashTableBase* createGroupByHashTable( const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, + const std::vector<const Type *> &group_by_types, const std::size_t estimated_num_groups, StorageManager *storage_manager) const override; - inline void iterateUnaryInl(AggregationStateSum *state, const TypedValue &value) const { + inline void iterateUnaryInl(AggregationStateSum *state, + const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); if (value.isNull()) return; @@ -105,8 +121,41 @@ class AggregationHandleSum : public AggregationConcreteHandle { state->null_ = false; } + inline void iterateUnaryInlFast(const TypedValue &value, + std::uint8_t *byte_ptr) const { + DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); + if (value.isNull()) return; + TypedValue *sum_ptr = + reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); + bool *null_ptr = + reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); + *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value); + *null_ptr = false; + } + + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInlFast(argument, byte_ptr); + } + } + + void blockUpdate() override { block_update_ = true; } + + void allowUpdate() override { block_update_ = false; } + + void initPayload(std::uint8_t *byte_ptr) const override { + TypedValue *sum_ptr = + reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); + bool *null_ptr = + reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); + *sum_ptr = blank_state_.sum_; + *null_ptr = true; + } + AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) + const override; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION AggregationState* accumulateValueAccessor( @@ -123,34 +172,51 @@ class AggregationHandleSum : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override; - inline TypedValue finalizeHashTableEntry(const AggregationState &state) const { - return static_cast<const AggregationStateSum&>(state).sum_; + inline TypedValue finalizeHashTableEntry( + const AggregationState &state) const { + return static_cast<const AggregationStateSum &>(state).sum_; + } + + inline TypedValue finalizeHashTableEntryFast( + const std::uint8_t *byte_ptr) const { + std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr); + TypedValue *sum_ptr = + reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_); + return *sum_ptr; } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() * for SUM aggregation. */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const override; + const AggregationStateHashTableBase &distinctify_hash_table) + const override; /** - * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() + * @brief Implementation of + * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() * for SUM aggregation. */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, + std::size_t index) const override; - void mergeGroupByHashTables( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const override; + std::size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } private: friend class AggregateFunctionSum; @@ -168,6 +234,8 @@ class AggregationHandleSum : public AggregationConcreteHandle { std::unique_ptr<UncheckedBinaryOperator> fast_operator_; std::unique_ptr<UncheckedBinaryOperator> merge_operator_; + bool block_update_; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 888d95c..e9503f7 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -146,9 +146,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory + quickstep_threading_SpinMutex quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_utility_Macros) @@ -163,6 +165,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -180,6 +183,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -204,6 +208,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -220,6 +225,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -236,6 +242,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -292,6 +299,7 @@ target_link_libraries(AggregationHandle_tests quickstep_expressions_aggregation_AggregationHandleMin quickstep_expressions_aggregation_AggregationHandleSum quickstep_expressions_aggregation_AggregationID + quickstep_storage_AggregationOperationState quickstep_storage_HashTableBase quickstep_storage_StorageManager quickstep_types_CharType http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp index afc02ec..79d4448 100644 --- a/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleAvg_unittest.cpp @@ -28,6 +28,8 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationHandleAvg.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/AggregationOperationState.hpp" +#include "storage/FastHashTableFactory.hpp" #include "storage/StorageManager.hpp" #include "types/CharType.hpp" #include "types/DateOperatorOverloads.hpp" @@ -53,51 +55,56 @@ namespace quickstep { -class AggregationHandleAvgTest : public::testing::Test { +class AggregationHandleAvgTest : public ::testing::Test { protected: static const int kNumSamples = 100; // Helper method that calls AggregationHandleAvg::iterateUnaryInl() to // aggregate 'value' into '*state'. void iterateHandle(AggregationState *state, const TypedValue &value) { - static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_).iterateUnaryInl( - static_cast<AggregationStateAvg*>(state), - value); + static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_) + .iterateUnaryInl(static_cast<AggregationStateAvg *>(state), value); } void initializeHandle(const Type &type) { aggregation_handle_avg_.reset( - AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle( - std::vector<const Type*>(1, &type))); + AggregateFunctionFactory::Get(AggregationID::kAvg) + .createHandle(std::vector<const Type *>(1, &type))); aggregation_handle_avg_state_.reset( aggregation_handle_avg_->createInitialState()); } static bool ApplyToTypesTest(TypeID typeID) { - const Type &type = (typeID == kChar || typeID == kVarChar) ? - TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) : - TypeFactory::GetType(typeID); + const Type &type = + (typeID == kChar || typeID == kVarChar) + ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) + : TypeFactory::GetType(typeID); - return AggregateFunctionFactory::Get(AggregationID::kAvg).canApplyToTypes( - std::vector<const Type*>(1, &type)); + return AggregateFunctionFactory::Get(AggregationID::kAvg) + .canApplyToTypes(std::vector<const Type *>(1, &type)); } static bool ResultTypeForArgumentTypeTest(TypeID input_type_id, TypeID output_type_id) { - const Type *result_type - = AggregateFunctionFactory::Get(AggregationID::kAvg).resultTypeForArgumentTypes( - std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id))); + const Type *result_type = + AggregateFunctionFactory::Get(AggregationID::kAvg) + .resultTypeForArgumentTypes(std::vector<const Type *>( + 1, &TypeFactory::GetType(input_type_id))); return (result_type->getTypeID() == output_type_id); } template <typename CppType> - static void CheckAvgValue( - CppType expected, - const AggregationHandle &handle, - const AggregationState &state) { + static void CheckAvgValue(CppType expected, + const AggregationHandle &handle, + const AggregationState &state) { EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>()); } + template <typename CppType> + static void CheckAvgValue(CppType expected, const TypedValue &value) { + EXPECT_EQ(expected, value.getLiteral<CppType>()); + } + // Static templated method for set a meaningful value to data types. template <typename CppType> static void SetDataType(int value, CppType *data) { @@ -108,7 +115,9 @@ class AggregationHandleAvgTest : public::testing::Test { void checkAggregationAvgGeneric() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_) + .isNull()); typename GenericType::cpptype val; typename GenericType::cpptype sum; @@ -119,15 +128,16 @@ class AggregationHandleAvgTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val)); sum += val; } iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue()); - CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples, - *aggregation_handle_avg_, - *aggregation_handle_avg_state_); + CheckAvgValue<typename OutputType::cpptype>( + static_cast<typename OutputType::cpptype>(sum) / kNumSamples, + *aggregation_handle_avg_, + *aggregation_handle_avg_state_); // Test mergeStates(). std::unique_ptr<AggregationState> merge_state( @@ -140,7 +150,7 @@ class AggregationHandleAvgTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } iterateHandle(merge_state.get(), type.makeValue(&val)); sum += val; @@ -155,7 +165,8 @@ class AggregationHandleAvgTest : public::testing::Test { } template <typename GenericType> - ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) { + ColumnVector* createColumnVectorGeneric(const Type &type, + typename GenericType::cpptype *sum) { NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3); typename GenericType::cpptype val; @@ -166,12 +177,12 @@ class AggregationHandleAvgTest : public::testing::Test { if (type.getTypeID() == kInt || type.getTypeID() == kLong) { SetDataType(i - 10, &val); } else { - SetDataType(static_cast<float>(i - 10)/10, &val); + SetDataType(static_cast<float>(i - 10) / 10, &val); } column->appendTypedValue(type.makeValue(&val)); *sum += val; // One NULL in the middle. - if (i == kNumSamples/2) { + if (i == kNumSamples / 2) { column->appendTypedValue(type.makeNullValue()); } } @@ -184,12 +195,15 @@ class AggregationHandleAvgTest : public::testing::Test { void checkAggregationAvgGenericColumnVector() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_) + .isNull()); typename GenericType::cpptype sum; SetDataType(0, &sum); std::vector<std::unique_ptr<ColumnVector>> column_vectors; - column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum)); + column_vectors.emplace_back( + createColumnVectorGeneric<GenericType>(type, &sum)); std::unique_ptr<AggregationState> cv_state( aggregation_handle_avg_->accumulateColumnVectors(column_vectors)); @@ -201,7 +215,8 @@ class AggregationHandleAvgTest : public::testing::Test { *aggregation_handle_avg_, *cv_state); - aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get()); + aggregation_handle_avg_->mergeStates(*cv_state, + aggregation_handle_avg_state_.get()); CheckAvgValue<typename OutputType::cpptype>( static_cast<typename OutputType::cpptype>(sum) / kNumSamples, *aggregation_handle_avg_, @@ -213,16 +228,19 @@ class AggregationHandleAvgTest : public::testing::Test { void checkAggregationAvgGenericValueAccessor() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); - EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull()); + EXPECT_TRUE( + aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_) + .isNull()); typename GenericType::cpptype sum; SetDataType(0, &sum); - std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor()); + std::unique_ptr<ColumnVectorsValueAccessor> accessor( + new ColumnVectorsValueAccessor()); accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum)); std::unique_ptr<AggregationState> va_state( - aggregation_handle_avg_->accumulateValueAccessor(accessor.get(), - std::vector<attribute_id>(1, 0))); + aggregation_handle_avg_->accumulateValueAccessor( + accessor.get(), std::vector<attribute_id>(1, 0))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -231,7 +249,8 @@ class AggregationHandleAvgTest : public::testing::Test { *aggregation_handle_avg_, *va_state); - aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get()); + aggregation_handle_avg_->mergeStates(*va_state, + aggregation_handle_avg_state_.get()); CheckAvgValue<typename OutputType::cpptype>( static_cast<typename OutputType::cpptype>(sum) / kNumSamples, *aggregation_handle_avg_, @@ -255,12 +274,14 @@ void AggregationHandleAvgTest::CheckAvgValue<double>( } template <> -void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) { +void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>( + int value, DatetimeIntervalLit *data) { data->interval_ticks = value; } template <> -void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) { +void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>( + int value, YearMonthIntervalLit *data) { data->months = value; } @@ -307,11 +328,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) { } TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>(); + checkAggregationAvgGenericColumnVector<DatetimeIntervalType, + DatetimeIntervalType>(); } TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>(); + checkAggregationAvgGenericColumnVector<YearMonthIntervalType, + YearMonthIntervalType>(); } #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -332,11 +355,13 @@ TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) { } TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>(); + checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, + DatetimeIntervalType>(); } TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) { - checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>(); + checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, + YearMonthIntervalType>(); } #endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION @@ -365,38 +390,53 @@ TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) { double double_val = 0; float float_val = 0; - iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val)); + iterateHandle(aggregation_handle_avg_state_.get(), + int_non_null_type.makeValue(&int_val)); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), ""); - EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), + long_type.makeValue(&long_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), + double_type.makeValue(&double_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), + float_type.makeValue(&float_val)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), + char_type.makeValue("asdf", 5)), + ""); + EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), + varchar_type.makeValue("asdf", 5)), + ""); // Test mergeStates() with incorrectly typed handles. std::unique_ptr<AggregationHandle> aggregation_handle_avg_double( - AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle( - std::vector<const Type*>(1, &double_type))); + AggregateFunctionFactory::Get(AggregationID::kAvg) + .createHandle(std::vector<const Type *>(1, &double_type))); std::unique_ptr<AggregationState> aggregation_state_avg_merge_double( aggregation_handle_avg_double->createInitialState()); - static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl( - static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()), - double_type.makeValue(&double_val)); - EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double, - aggregation_handle_avg_state_.get()), - ""); + static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_double) + .iterateUnaryInl(static_cast<AggregationStateAvg *>( + aggregation_state_avg_merge_double.get()), + double_type.makeValue(&double_val)); + EXPECT_DEATH( + aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double, + aggregation_handle_avg_state_.get()), + ""); std::unique_ptr<AggregationHandle> aggregation_handle_avg_float( - AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle( - std::vector<const Type*>(1, &float_type))); + AggregateFunctionFactory::Get(AggregationID::kAvg) + .createHandle(std::vector<const Type *>(1, &float_type))); std::unique_ptr<AggregationState> aggregation_state_avg_merge_float( aggregation_handle_avg_float->createInitialState()); - static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl( - static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()), - float_type.makeValue(&float_val)); - EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float, - aggregation_handle_avg_state_.get()), - ""); + static_cast<const AggregationHandleAvg &>(*aggregation_handle_avg_float) + .iterateUnaryInl(static_cast<AggregationStateAvg *>( + aggregation_state_avg_merge_float.get()), + float_type.makeValue(&float_val)); + EXPECT_DEATH( + aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float, + aggregation_handle_avg_state_.get()), + ""); } #endif @@ -417,8 +457,10 @@ TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) { EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble)); EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble)); EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval)); - EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); + EXPECT_TRUE( + ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval)); + EXPECT_TRUE( + ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval)); } TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { @@ -426,25 +468,28 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { initializeHandle(long_non_null_type); storage_manager_.reset(new StorageManager("./test_avg_data")); std::unique_ptr<AggregationStateHashTableBase> source_hash_table( - aggregation_handle_avg_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, + AggregationStateFastHashTableFactory::CreateResizable( + HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, + {aggregation_handle_avg_.get()->getPayloadSize()}, + {aggregation_handle_avg_.get()}, storage_manager_.get())); std::unique_ptr<AggregationStateHashTableBase> destination_hash_table( - aggregation_handle_avg_->createGroupByHashTable( - HashTableImplType::kSimpleScalarSeparateChaining, + AggregationStateFastHashTableFactory::CreateResizable( + HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, + {aggregation_handle_avg_.get()->getPayloadSize()}, + {aggregation_handle_avg_.get()}, storage_manager_.get())); - AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived = - static_cast<AggregationStateHashTable<AggregationStateAvg> *>( + AggregationStateFastHashTable *destination_hash_table_derived = + static_cast<AggregationStateFastHashTable *>( destination_hash_table.get()); - AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived = - static_cast<AggregationStateHashTable<AggregationStateAvg> *>( - source_hash_table.get()); + AggregationStateFastHashTable *source_hash_table_derived = + static_cast<AggregationStateFastHashTable *>(source_hash_table.get()); AggregationHandleAvg *aggregation_handle_avg_derived = static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get()); @@ -496,36 +541,56 @@ TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) { exclusive_key_source_state.get(), exclusive_key_source_avg_val); // Add the key-state pairs to the hash tables. - source_hash_table_derived->putCompositeKey(common_key, - *common_key_source_state); - destination_hash_table_derived->putCompositeKey( - common_key, *common_key_destination_state); - source_hash_table_derived->putCompositeKey(exclusive_source_key, - *exclusive_key_source_state); - destination_hash_table_derived->putCompositeKey( - exclusive_destination_key, *exclusive_key_destination_state); + unsigned char buffer[100]; + buffer[0] = '\0'; + memcpy(buffer + 1, + common_key_source_state.get()->getPayloadAddress(), + aggregation_handle_avg_.get()->getPayloadSize()); + source_hash_table_derived->putCompositeKey(common_key, buffer); + + memcpy(buffer + 1, + common_key_destination_state.get()->getPayloadAddress(), + aggregation_handle_avg_.get()->getPayloadSize()); + destination_hash_table_derived->putCompositeKey(common_key, buffer); + + memcpy(buffer + 1, + exclusive_key_source_state.get()->getPayloadAddress(), + aggregation_handle_avg_.get()->getPayloadSize()); + source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + + memcpy(buffer + 1, + exclusive_key_destination_state.get()->getPayloadAddress(), + aggregation_handle_avg_.get()->getPayloadSize()); + destination_hash_table_derived->putCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table, - destination_hash_table.get()); + AggregationOperationState::mergeGroupByHashTables( + source_hash_table.get(), destination_hash_table.get()); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckAvgValue<double>( (common_key_destination_avg_val.getLiteral<std::int64_t>() + - common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2), - *aggregation_handle_avg_derived, - *(destination_hash_table_derived->getSingleCompositeKey(common_key))); - CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(), - *aggregation_handle_avg_derived, - *(destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key))); - CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(), - *aggregation_handle_avg_derived, - *(source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key))); + common_key_source_avg_val.getLiteral<std::int64_t>()) / + static_cast<double>(2), + aggregation_handle_avg_derived->finalizeHashTableEntryFast( + destination_hash_table_derived->getSingleCompositeKey(common_key) + + 1)); + CheckAvgValue<double>( + exclusive_key_destination_avg_val.getLiteral<std::int64_t>(), + aggregation_handle_avg_derived->finalizeHashTableEntryFast( + destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key) + + 1)); + CheckAvgValue<double>( + exclusive_key_source_avg_val.getLiteral<std::int64_t>(), + aggregation_handle_avg_derived->finalizeHashTableEntryFast( + source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key) + + 1)); } } // namespace quickstep