Repository: incubator-quickstep Updated Branches: refs/heads/untyped-agg 9ccd5a311 -> c41451dbd
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c41451db Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c41451db Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c41451db Branch: refs/heads/untyped-agg Commit: c41451dbd01fbc11609f996fe9357e091eb49307 Parents: 9ccd5a3 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Tue Oct 18 09:53:43 2016 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Tue Oct 18 09:53:43 2016 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationHandleAvg.cpp | 270 ++++++++----------- .../aggregation/AggregationHandleAvg.hpp | 33 ++- .../aggregation/AggregationHandleCount.cpp | 6 +- .../aggregation/AggregationHandleSum.cpp | 1 + storage/AggregationOperationState.cpp | 63 ++--- storage/AggregationOperationState.hpp | 1 + storage/AggregationStateHashTable.hpp | 157 +++++++---- storage/AggregationStateManager.hpp | 48 ++-- storage/HashTableBase.hpp | 3 + storage/StorageBlock.cpp | 91 +++---- storage/StorageBlock.hpp | 6 +- 11 files changed, 341 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index 47f3f41..e90f10f 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -24,8 +24,6 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" #include "types/TypeID.hpp" @@ -40,165 +38,113 @@ namespace quickstep { class StorageManager; -AggregationHandleAvg::AggregationHandleAvg(const Type &type) {} -// : argument_type_(type), block_update_(false) { -// // We sum Int as Long and Float as Double so that we have more headroom when -// // adding many values. -// TypeID type_precision_id; -// switch (type.getTypeID()) { -// case kInt: -// case kLong: -// type_precision_id = kLong; -// break; -// case kFloat: -// case kDouble: -// type_precision_id = kDouble; -// break; -// default: -// type_precision_id = type.getTypeID(); -// break; -// } -// -// const Type &sum_type = TypeFactory::GetType(type_precision_id); -// blank_state_.sum_ = sum_type.makeZeroValue(); -// blank_state_.count_ = 0; -// -// // Make operators to do arithmetic: -// // Add operator for summing argument values. -// fast_add_operator_.reset( -// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) -// .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type_)); -// // Add operator for merging states. -// merge_add_operator_.reset( -// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) -// .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type)); -// // Divide operator for dividing sum by count to get final average. -// divide_operator_.reset( -// BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide) -// .makeUncheckedBinaryOperatorForTypes(sum_type, -// TypeFactory::GetType(kDouble))); -// -// // Result is nullable, because AVG() over 0 values (or all NULL values) is -// // NULL. -// result_type_ = -// &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide) -// .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble)) -// ->getNullableVersion()); -//} -// -//AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable( -// const HashTableImplType hash_table_impl, -// const std::vector<const Type *> &group_by_types, -// const std::size_t estimated_num_groups, -// StorageManager *storage_manager) const { -// return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable( -// hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -//} -// -//AggregationState* AggregationHandleAvg::accumulateColumnVectors( -// const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { -// DCHECK_EQ(1u, column_vectors.size()) -// << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size(); -// -// AggregationStateAvg *state = new AggregationStateAvg(blank_state_); -// std::size_t count = 0; -// state->sum_ = fast_add_operator_->accumulateColumnVector( -// state->sum_, *column_vectors.front(), &count); -// state->count_ = count; -// return state; -//} -// -//#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -//AggregationState* AggregationHandleAvg::accumulateValueAccessor( -// ValueAccessor *accessor, -// const std::vector<attribute_id> &accessor_ids) const { -// DCHECK_EQ(1u, accessor_ids.size()) -// << "Got wrong number of attributes for AVG: " << accessor_ids.size(); -// -// AggregationStateAvg *state = new AggregationStateAvg(blank_state_); -// std::size_t count = 0; -// state->sum_ = fast_add_operator_->accumulateValueAccessor( -// state->sum_, accessor, accessor_ids.front(), &count); -// state->count_ = count; -// return state; -//} -//#endif -// -//void AggregationHandleAvg::aggregateValueAccessorIntoHashTable( -// ValueAccessor *accessor, -// const std::vector<attribute_id> &argument_ids, -// const std::vector<attribute_id> &group_by_key_ids, -// AggregationStateHashTableBase *hash_table) const { -// DCHECK_EQ(1u, argument_ids.size()) -// << "Got wrong number of arguments for AVG: " << argument_ids.size(); -//} -// -//void AggregationHandleAvg::mergeStates(const AggregationState &source, -// AggregationState *destination) const { -// const AggregationStateAvg &avg_source = -// static_cast<const AggregationStateAvg &>(source); -// AggregationStateAvg *avg_destination = -// static_cast<AggregationStateAvg *>(destination); -// -// SpinMutexLock lock(avg_destination->mutex_); -// avg_destination->count_ += avg_source.count_; -// avg_destination->sum_ = merge_add_operator_->applyToTypedValues( -// avg_destination->sum_, avg_source.sum_); -//} -// -//void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source, -// std::uint8_t *destination) const { -// const TypedValue *src_sum_ptr = -// reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_); -// const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>( -// source + blank_state_.count_offset_); -// TypedValue *dst_sum_ptr = -// reinterpret_cast<TypedValue *>(destination + blank_state_.sum_offset_); -// std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>( -// destination + blank_state_.count_offset_); -// (*dst_count_ptr) += (*src_count_ptr); -// *dst_sum_ptr = -// merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr); -//} -// -//TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const { -// const AggregationStateAvg &agg_state = -// static_cast<const AggregationStateAvg &>(state); -// if (agg_state.count_ == 0) { -// // AVG() over no values is NULL. -// return result_type_->makeNullValue(); -// } else { -// // Divide sum by count to get final average. -// return divide_operator_->applyToTypedValues( -// agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_))); -// } -//} -// -//ColumnVector* AggregationHandleAvg::finalizeHashTable( -// const AggregationStateHashTableBase &hash_table, -// std::vector<std::vector<TypedValue>> *group_by_keys, -// int index) const { -// return finalizeHashTableHelperFast<AggregationHandleAvg, -// AggregationStateFastHashTable>( -// *result_type_, hash_table, group_by_keys, index); -//} -// -//AggregationState* -//AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( -// const AggregationStateHashTableBase &distinctify_hash_table) const { -// return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< -// AggregationHandleAvg, -// AggregationStateAvg>(distinctify_hash_table); -//} -// -//void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( -// const AggregationStateHashTableBase &distinctify_hash_table, -// AggregationStateHashTableBase *aggregation_hash_table, -// std::size_t index) const { -// aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< -// AggregationHandleAvg, -// AggregationStateFastHashTable>( -// distinctify_hash_table, aggregation_hash_table, index); -//} +AggregationHandleAvg::AggregationHandleAvg(const Type &argument_type) { + // We sum Int as Long and Float as Double so that we have more headroom when + // adding many values. + TypeID type_precision_id; + switch (argument_type.getTypeID()) { + case kInt: + case kLong: + type_precision_id = kLong; + break; + case kFloat: + case kDouble: + type_precision_id = kDouble; + break; + default: + type_precision_id = argument_type.getTypeID(); + break; + } + + const Type &sum_type = TypeFactory::GetType(type_precision_id); + const Type &count_type = TypeFactory::GetType(CountType::kStaticTypeID); + + const std::size_t sum_state_size = sum_type.maximumByteLength(); + count_offset_ = sum_state_size; + state_size_ = sum_state_size + sizeof(CountCppType); + + blank_state_.reset(state_size_, false); + sum_type.makeZeroValue(blank_state_.get()); + count_type.makeZeroValue(static_cast<char *>(blank_state_.get()) + sum_state_size); + + tv_blank_sum_ = sum_type.makeZeroValue(); + + // Make operators to do arithmetic: + // Add operator for summing argument values. + accumulate_add_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) + .makeUncheckedBinaryOperatorForTypes(sum_type, argument_type)); + const auto accumulate_add_functor = accumulate_add_operator_->getMergeFunctor(); + accumulate_functor_ = [sum_state_size, accumulate_add_functor]( + void *state, const void *value) { + accumulate_add_functor(state, value); + void *count_ptr = static_cast<char *>(state) + sum_state_size; + ++(*static_cast<CountCppType *>(count_ptr)); + }; + + // Add operator for merging states. + merge_add_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd) + .makeUncheckedBinaryOperatorForTypes(sum_type, sum_type)); + const auto merge_add_functor = merge_add_operator_->getMergeFunctor(); + merge_functor_ = [sum_state_size, merge_add_functor]( + void *destination_state, const void *source_state) { + merge_add_functor(destination_state, source_state); + void *destination_count_ptr = + static_cast<char *>(destination_state) + sum_state_size; + const void *source_count_ptr = + static_cast<const char *>(source_state) + sum_state_size; + *static_cast<CountCppType *>(destination_count_ptr) += + *static_cast<const CountCppType *>(source_count_ptr); + }; + + // Divide operator for dividing sum by count to get final average. + divide_operator_.reset( + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide) + .makeUncheckedBinaryOperatorForTypes(sum_type, + TypeFactory::GetType(kDouble))); + const auto divide_functor = divide_operator_->getFunctor(); + finalize_functor_ = [sum_state_size, divide_functor]( + void *result, const void *state) { + const void *count_ptr = static_cast<const char *>(state) + sum_state_size; + const double count = *static_cast<const CountCppType *>(count_ptr); + divide_functor(result, state, &count); + }; + + result_type_ = + BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide) + .resultTypeForArgumentTypes(sum_type, TypeFactory::GetType(kDouble)); +} + +void AggregationHandleAvg::accumulateColumnVectors( + void *state, + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { + DCHECK_EQ(1u, column_vectors.size()) + << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size(); + + std::size_t count = 0; + TypedValue cv_sum = accumulate_add_operator_->accumulateColumnVector( + tv_blank_sum_, *column_vectors.front(), &count); + cv_sum.copyInto(state); + void *count_ptr = static_cast<char *>(state) + count_offset_; + *static_cast<CountCppType *>(count_ptr) = count; +} + +#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION +void AggregationHandleAvg::accumulateValueAccessor( + void *state, + ValueAccessor *accessor, + const std::vector<attribute_id> &accessor_ids) const { + DCHECK_EQ(1u, accessor_ids.size()) + << "Got wrong number of attributes for AVG: " << accessor_ids.size(); + + std::size_t count = 0; + TypedValue cv_sum = accumulate_add_operator_->accumulateValueAccessor( + tv_blank_sum_, accessor, accessor_ids.front(), &count); + cv_sum.copyInto(state); + void *count_ptr = static_cast<char *>(state) + count_offset_; + *static_cast<CountCppType *>(count_ptr) = count; +} +#endif } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index cc5adc8..416cf53 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "types/LongType.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" #include "types/operations/binary_operations/BinaryOperation.hpp" @@ -52,6 +53,17 @@ class AggregationHandleAvg : public AggregationHandle { public: ~AggregationHandleAvg() override {} + void accumulateColumnVectors( + void *state, + const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; + +#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION + void accumulateValueAccessor( + void *state, + ValueAccessor *accessor, + const std::vector<attribute_id> &accessor_ids) const override; +#endif + private: friend class AggregateFunctionAvg; @@ -60,14 +72,19 @@ class AggregationHandleAvg : public AggregationHandle { * * @param type Type of the avg value. **/ - explicit AggregationHandleAvg(const Type &type); - -// const Type &argument_type_; -// const Type *result_type_; -// AggregationStateAvg blank_state_; -// std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_; -// std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_; -// std::unique_ptr<UncheckedBinaryOperator> divide_operator_; + explicit AggregationHandleAvg(const Type &argument_type); + + typedef LongType CountType; + typedef CountType::cpptype CountCppType; + + // TODO: temporary + TypedValue tv_blank_sum_; + + std::size_t count_offset_; + + std::unique_ptr<UncheckedBinaryOperator> accumulate_add_operator_; + std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_; + std::unique_ptr<UncheckedBinaryOperator> divide_operator_; DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index c095a82..12e2b86 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -49,7 +49,10 @@ class ValueAccessor; template <bool count_star, bool nullable_type> AggregationHandleCount<count_star, nullable_type>::AggregationHandleCount() { state_size_ = sizeof(ResultCppType); - blank_state_.reset(state_size_, true); + blank_state_.reset(state_size_, false); + + result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID); + result_type_->makeZeroValue(blank_state_.get()); accumulate_functor_ = [](void *state, const void *value) { *static_cast<ResultCppType *>(state) += 1; @@ -65,7 +68,6 @@ AggregationHandleCount<count_star, nullable_type>::AggregationHandleCount() { *static_cast<const ResultCppType *>(state); }; - result_type_ = &TypeFactory::GetType(ResultType::kStaticTypeID); } template <bool count_star, bool nullable_type> http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 4e77ed0..61a0774 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -66,6 +66,7 @@ AggregationHandleSum::AggregationHandleSum(const Type &argument_type) { state_size_ = sum_type.maximumByteLength(); blank_state_.reset(state_size_, false); + sum_type.makeZeroValue(blank_state_.get()); tv_blank_state_ = sum_type.makeZeroValue(); // Make operators to do arithmetic: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 50e7c06..89b7b72 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -83,15 +83,15 @@ AggregationOperationState::AggregationOperationState( group_by_types.emplace_back(&group_by_element->getType()); } - std::vector<AggregationHandle *> group_by_handles; - group_by_handles.clear(); + std::vector<AggregationHandle *> aggregation_handles; + aggregation_handles.clear(); // Set up each individual aggregate in this operation. for (std::size_t i = 0; i < aggregate_functions.size(); ++i) { // Get the Types of this aggregate's arguments so that we can create an // AggregationHandle. std::vector<const Type *> argument_types; - for (const std::unique_ptr<const Scalar> &argument : arguments[i]) { + for (const std::unique_ptr<const Scalar> &argument : arguments_[i]) { argument_types.emplace_back(&argument->getType()); } @@ -107,9 +107,9 @@ AggregationOperationState::AggregationOperationState( if (!group_by_list_.empty()) { // TODO(jianqiao): handle DISTINCT aggregation. - // if (is_distinct[i]) { + // if (is_distinct_[i]) { // } - group_by_handles.emplace_back(handles_.back()); + aggregation_handles.emplace_back(handles_.back()); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); @@ -119,8 +119,8 @@ AggregationOperationState::AggregationOperationState( // relation. If so, remember the attribute IDs so that we can do copy // elision when actually performing the aggregation. std::vector<attribute_id> local_arguments_as_attributes; - local_arguments_as_attributes.reserve(arguments[i].size()); - for (const std::unique_ptr<const Scalar> &argument : arguments[i]) { + local_arguments_as_attributes.reserve(arguments_[i].size()); + for (const std::unique_ptr<const Scalar> &argument : arguments_[i]) { const attribute_id argument_id = argument->getAttributeIdForValueAccessor(); if (argument_id == -1) { @@ -139,12 +139,21 @@ AggregationOperationState::AggregationOperationState( } } - if (!group_by_handles.empty()) { + if (!group_by_list_.empty()) { + // TODO: handle non-fast-path case + for (const auto &group_by_attribute : group_by_list_) { + const attribute_id attr_id = + group_by_attribute->getAttributeIdForValueAccessor(); + CHECK_NE(attr_id, kInvalidAttributeID); + + group_by_attribute_ids_.emplace_back(attr_id); + } + // Aggregation with GROUP BY: create a HashTable pool for per-group states. group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, hash_table_impl_type, group_by_types, - group_by_handles, + aggregation_handles, storage_manager)); } } @@ -349,24 +358,14 @@ void AggregationOperationState::aggregateBlockHashTable( BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); - // If there is a filter predicate, 'reuse_matches' holds the set of matching - // tuples so that it can be reused across multiple aggregates (i.e. we only - // pay the cost of evaluating the predicate once). - std::unique_ptr<TupleIdSequence> reuse_matches; - - // This holds values of all the GROUP BY attributes so that the can be reused - // across multiple aggregates (i.e. we only pay the cost of evaluatin the - // GROUP BY expressions once). - std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; - - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression - // values and the aggregation arguments together as keys directly into the - // (threadsafe) shared global distinctify HashTable for this aggregate. - // TODO(jianqiao): handle DISTINCT aggregation. - } - } +// for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { +// if (is_distinct_[agg_idx]) { +// // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression +// // values and the aggregation arguments together as keys directly into the +// // (threadsafe) shared global distinctify HashTable for this aggregate. +// // TODO(jianqiao): handle DISTINCT aggregation. +// } +// } // Call StorageBlock::aggregateGroupBy() to aggregate this block's values // directly into the (threadsafe) shared global HashTable for this @@ -376,11 +375,10 @@ void AggregationOperationState::aggregateBlockHashTable( DCHECK(agg_hash_table != nullptr); block->aggregateGroupBy(arguments_, - group_by_list_, + group_by_attribute_ids_, predicate_.get(), - agg_hash_table, - &reuse_matches, - &reuse_group_by_vectors); + agg_hash_table); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); } @@ -437,6 +435,9 @@ void AggregationOperationState::finalizeHashTable( mergeGroupByHashTables(final_hash_table.get(), hash_table.get()); } +// static_cast<ThreadPrivateAggregationStateHashTable *>( +// final_hash_table.get())->print(); + // Bulk-insert the complete result. std::unique_ptr<AggregationResultIterator> results( final_hash_table->createResultIterator()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 9fa3bd2..869f391 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -191,6 +191,7 @@ class AggregationOperationState { const CatalogRelationSchema &input_relation_; std::unique_ptr<const Predicate> predicate_; std::vector<std::unique_ptr<const Scalar>> group_by_list_; + std::vector<attribute_id> group_by_attribute_ids_; // Each individual aggregate in this operation has an AggregationHandle and // some number of Scalar arguments. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationStateHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationStateHashTable.hpp b/storage/AggregationStateHashTable.hpp index 85a6bdc..ee69725 100644 --- a/storage/AggregationStateHashTable.hpp +++ b/storage/AggregationStateHashTable.hpp @@ -46,6 +46,7 @@ #include "threading/SpinSharedMutex.hpp" #include "types/Type.hpp" #include "types/TypeFunctors.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/Alignment.hpp" #include "utility/InlineMemcpy.hpp" #include "utility/Macros.hpp" @@ -112,65 +113,119 @@ class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableB } bool upsertValueAccessor(ValueAccessor *accessor, + ColumnVectorsValueAccessor *temp_accessor, const attribute_id key_attr_id, const std::vector<attribute_id> &argument_ids) override { - if (key_manager_.isKeyNullable()) { - return upsertValueAccessorInternal<true>( - accessor, key_attr_id, argument_ids); +// if (key_manager_.isKeyNullable()) { +// return upsertValueAccessorInternal<true>( +// accessor, key_attr_id, argument_ids); +// } else { +// return upsertValueAccessorInternal<false>( +// accessor, key_attr_id, argument_ids); +// } + return true; + } +// +// template <bool check_for_null_keys> +// bool upsertValueAccessorInternal(ValueAccessor *accessor, +// const attribute_id key_attr_id, +// const std::vector<attribute_id> &argument_ids) { +// return InvokeOnAnyValueAccessor( +// accessor, +// [&](auto *accessor) -> bool { // NOLINT(build/c++11) +// accessor->beginIteration(); +// while (accessor->next()) { +// const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id); +// if (check_for_null_keys && key == nullptr) { +// continue; +// } +// void *bucket = locateBucket(key); +// payload_manager_.template updateStates<check_for_null_keys>( +// bucket, accessor, argument_ids); +// } +// return true; +// }); +// } + + bool upsertValueAccessorCompositeKey(ValueAccessor *accessor, + ColumnVectorsValueAccessor *temp_accessor, + const std::vector<attribute_id> &key_attr_ids, + const std::vector<attribute_id> &argument_ids) override { +// if (key_attr_ids.size() == 1) { +// upsertValueAccessor(accessor, +// key_attr_ids.front(), +// argument_ids); +// } + + if (temp_accessor == nullptr) { + if (key_manager_.isKeyNullable()) { + return upsertValueAccessorCompositeKeyInternal<true>( + accessor, key_attr_ids, argument_ids); + } else { + return upsertValueAccessorCompositeKeyInternal<false>( + accessor, key_attr_ids, argument_ids); + } } else { - return upsertValueAccessorInternal<false>( - accessor, key_attr_id, argument_ids); + if (key_manager_.isKeyNullable()) { + return upsertValueAccessorCompositeKeyInternal<true>( + accessor, temp_accessor, key_attr_ids, argument_ids); + } else { + return upsertValueAccessorCompositeKeyInternal<false>( + accessor, temp_accessor, key_attr_ids, argument_ids); + } } } template <bool check_for_null_keys> - bool upsertValueAccessorInternal(ValueAccessor *accessor, - const attribute_id key_attr_id, - const std::vector<attribute_id> &argument_ids) { + bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor, + const std::vector<attribute_id> &key_attr_ids, + const std::vector<attribute_id> &argument_ids) { return InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> bool { // NOLINT(build/c++11) accessor->beginIteration(); + void *prealloc_bucket = allocateBucket(); while (accessor->next()) { - const void *key = accessor->template getUntypedValue<check_for_null_keys>(key_attr_id); - if (check_for_null_keys && key == nullptr) { - continue; - } - bool is_empty; - void *bucket = locateBucket(key, &is_empty); - if (is_empty) { - payload_manager_.initializeStates(bucket); + if (check_for_null_keys) { + const bool is_null = + key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket( + accessor, + key_attr_ids, + prealloc_bucket); + if (is_null) { + continue; + } } else { - payload_manager_.template updateStates<check_for_null_keys>( - bucket, accessor, argument_ids); + key_manager_.writeUntypedKeyFromValueAccessorToBucket( + accessor, + key_attr_ids, + prealloc_bucket); } + void *bucket = locateBucketWithPrealloc(&prealloc_bucket); + payload_manager_.updateStates( + bucket, accessor, argument_ids); } + // Reclaim the last unused bucket + --buckets_allocated_; return true; }); - } - - bool upsertValueAccessorCompositeKey(ValueAccessor *accessor, - const std::vector<attribute_id> &key_attr_ids, - const std::vector<attribute_id> &argument_ids) override { - if (key_manager_.isKeyNullable()) { - return upsertValueAccessorCompositeKeyInternal<true>( - accessor, key_attr_ids, argument_ids); - } else { - return upsertValueAccessorCompositeKeyInternal<false>( - accessor, key_attr_ids, argument_ids); - } + return true; } template <bool check_for_null_keys> bool upsertValueAccessorCompositeKeyInternal(ValueAccessor *accessor, + ColumnVectorsValueAccessor *temp_accessor, const std::vector<attribute_id> &key_attr_ids, const std::vector<attribute_id> &argument_ids) { return InvokeOnAnyValueAccessor( accessor, [&](auto *accessor) -> bool { // NOLINT(build/c++11) accessor->beginIteration(); + temp_accessor->beginIteration(); void *prealloc_bucket = allocateBucket(); while (accessor->next()) { + temp_accessor->next(); + if (check_for_null_keys) { const bool is_null = key_manager_.writeNullableUntypedKeyFromValueAccessorToBucket( @@ -186,30 +241,29 @@ class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableB key_attr_ids, prealloc_bucket); } - void *bucket = locateBucketWithPrealloc(prealloc_bucket); - if (bucket == prealloc_bucket) { - payload_manager_.initializeStates(bucket); - prealloc_bucket = allocateBucket(); - } else { - payload_manager_.template updateStates<check_for_null_keys>( - bucket, accessor, argument_ids); - } + + void *bucket = locateBucketWithPrealloc(&prealloc_bucket); + payload_manager_.updateStates( + bucket, accessor, temp_accessor, argument_ids); } // Reclaim the last unused bucket --buckets_allocated_; return true; }); + return true; } void mergeHashTable(const ThreadPrivateAggregationStateHashTable *source_hash_table) { source_hash_table->forEachKeyAndStates( [&](const void *source_key, const void *source_states) -> void { - bool is_empty; - void *bucket = locateBucket(source_key, &is_empty); - if (is_empty) { + auto slot_it = slots_.find(source_key); + if (slot_it == slots_.end()) { + void *bucket = allocateBucket(); + key_manager_.writeUntypedKeyToBucket(source_key, bucket); payload_manager_.copyStates(bucket, source_states); + slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket); } else { - payload_manager_.mergeStates(bucket, source_states); + payload_manager_.mergeStates(slot_it->second, source_states); } }); } @@ -233,26 +287,28 @@ class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableB return static_cast<char *>(buckets_) + bucket_id * bucket_size_; } - inline void* locateBucket(const void *key, bool *is_empty) { + inline void* locateBucket(const void *key) { auto slot_it = slots_.find(key); if (slot_it == slots_.end()) { void *bucket = allocateBucket(); key_manager_.writeUntypedKeyToBucket(key, bucket); + payload_manager_.initializeStates(bucket); slots_.emplace(key_manager_.getUntypedKeyComponent(bucket), bucket); - *is_empty = true; return bucket; } else { - *is_empty = false; return slot_it->second; } } - inline void* locateBucketWithPrealloc(void *prealloc_bucket) { - const void *key = key_manager_.getUntypedKeyComponent(prealloc_bucket); + inline void* locateBucketWithPrealloc(void **prealloc_bucket) { + void *bucket = *prealloc_bucket; + const void *key = key_manager_.getUntypedKeyComponent(bucket); auto slot_it = slots_.find(key); if (slot_it == slots_.end()) { - slots_.emplace(key, prealloc_bucket); - return prealloc_bucket; + payload_manager_.initializeStates(bucket); + slots_.emplace(key, bucket); + *prealloc_bucket = allocateBucket(); + return bucket; } else { return slot_it->second; } @@ -295,7 +351,7 @@ class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableB std::cerr << "Buckets: \n"; for (const auto &pair : slots_) { std::cerr << pair.first << " -- " << pair.second << "\n"; - std::cerr << *static_cast<const int *>(pair.second) << "\n"; + std::cerr << *static_cast<const std::uint64_t *>(pair.second) << "\n"; } } @@ -335,4 +391,3 @@ class ThreadPrivateAggregationStateHashTable : public AggregationStateHashTableB } // namespace quickstep #endif // QUICKSTEP_STORAGE_AGGREGATION_STATE_HASH_TABLE_HPP_ - http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/AggregationStateManager.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationStateManager.hpp b/storage/AggregationStateManager.hpp index 98dca90..81a2db8 100644 --- a/storage/AggregationStateManager.hpp +++ b/storage/AggregationStateManager.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "threading/SpinMutex.hpp" #include "threading/SpinSharedMutex.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/InlineMemcpy.hpp" #include "utility/Macros.hpp" #include "utility/ScopedBuffer.hpp" @@ -71,8 +72,7 @@ class AggregationStateManager { new(initial_states_.get()) Mutex; } for (std::size_t i = 0; i < handles_.size(); ++i) { - handles_[i]->initializeState( - static_cast<char *>(initial_states_.get()) + state_offsets_[i]); + handles_[i]->initializeState(getStateComponent(initial_states_.get(), i)); } } @@ -88,33 +88,39 @@ class AggregationStateManager { copyStates(states, initial_states_.get()); } - template <bool check_for_null_keys, typename ValueAccessorT> - inline void updateState(void *states, - ValueAccessorT *accessor, - const attribute_id argument_id) const { - // TODO: templates on whether to check invalid attribute id - DCHECK_NE(argument_id, kInvalidAttributeID); - - const void *value = - accessor->template getUntypedValue<check_for_null_keys>(argument_id); - if (check_for_null_keys && value == nullptr) { - return; + template <typename ValueAccessorT> + inline void updateStates(void *states, + ValueAccessorT *accessor, + const std::vector<attribute_id> &argument_ids) const { + for (std::size_t i = 0; i < argument_ids.size(); ++i) { + // TODO: templates on whether to check invalid attribute id + const void *value; + const attribute_id argument_id = argument_ids[i]; + if (argument_id == kInvalidAttributeID) { + value = nullptr; + } else { + value = accessor->template getUntypedValue<false>(argument_id); + // TODO: check null + } + accumulate_functors_[i](getStateComponent(states, i), value); } - accumulate_functors_.front()(states, value); } - template <bool check_for_null_keys, typename ValueAccessorT> + template <typename ValueAccessorT> inline void updateStates(void *states, ValueAccessorT *accessor, + ColumnVectorsValueAccessor *temp_accessor, const std::vector<attribute_id> &argument_ids) const { for (std::size_t i = 0; i < argument_ids.size(); ++i) { // TODO: templates on whether to check invalid attribute id - DCHECK_NE(argument_ids[i], kInvalidAttributeID); - - const void *value = - accessor->template getUntypedValue<check_for_null_keys>(argument_ids[i]); - if (check_for_null_keys && value == nullptr) { - return; + const void *value = nullptr; + const attribute_id argument_id = argument_ids[i]; + if (argument_id >= 0) { + value = accessor->template getUntypedValue<false>(argument_id); + // TODO: check null + } else if (argument_id != kInvalidAttributeID){ + value = temp_accessor->template getUntypedValue<false>(-(argument_id+2)); + // TODO: check null } accumulate_functors_[i](getStateComponent(states, i), value); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/HashTableBase.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp index f823494..04e82a0 100644 --- a/storage/HashTableBase.hpp +++ b/storage/HashTableBase.hpp @@ -29,6 +29,7 @@ namespace quickstep { class AggregationResultIterator; +class ColumnVectorsValueAccessor; /** \addtogroup Storage * @{ @@ -88,10 +89,12 @@ class AggregationStateHashTableBase { virtual ~AggregationStateHashTableBase() {} virtual bool upsertValueAccessor(ValueAccessor *accessor, + ColumnVectorsValueAccessor *temp_accessor, const attribute_id key_attr_id, const std::vector<attribute_id> &argument_ids) = 0; virtual bool upsertValueAccessorCompositeKey(ValueAccessor *accessor, + ColumnVectorsValueAccessor *temp_accessor, const std::vector<attribute_id> &key_attr_ids, const std::vector<attribute_id> &argument_ids) = 0; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index abb17f1..ca4f897 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -431,12 +431,10 @@ ScopedBuffer StorageBlock::aggregate( void StorageBlock::aggregateGroupBy( const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, - const std::vector<std::unique_ptr<const Scalar>> &group_by, + const std::vector<attribute_id> &group_by_attribute_ids, const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, - std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const { - DCHECK_GT(group_by.size(), 0u) + AggregationStateHashTableBase *hash_table) const { + DCHECK_GT(group_by_attribute_ids.size(), 0u) << "Called aggregateGroupBy() with zero GROUP BY expressions"; SubBlocksReference sub_blocks_ref(*tuple_store_, @@ -446,68 +444,43 @@ void StorageBlock::aggregateGroupBy( // IDs of 'arguments' as attributes in the ValueAccessor we create below. std::vector<attribute_id> argument_ids; - // IDs of GROUP BY key element(s) in the ValueAccessor we create below. - std::vector<attribute_id> key_ids; - - // An intermediate ValueAccessor that stores the materialized 'arguments' for - // this aggregate, as well as the GROUP BY expression values. + std::unique_ptr<TupleIdSequence> matches; + std::unique_ptr<ValueAccessor> accessor; ColumnVectorsValueAccessor temp_result; - { - std::unique_ptr<ValueAccessor> accessor; - if (predicate) { - if (!*reuse_matches) { - // If there is a filter predicate that hasn't already been evaluated, - // evaluate it now and save the results for other aggregates on this - // same block. - reuse_matches->reset(getMatchesForPredicate(predicate)); - } - // Create a filtered ValueAccessor that only iterates over predicate - // matches. - accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); - } else { - // Create a ValueAccessor that iterates over all tuples in this block - accessor.reset(tuple_store_->createValueAccessor()); - } + if (predicate) { + // Create a filtered ValueAccessor that only iterates over predicate + // matches. + matches.reset(getMatchesForPredicate(predicate)); + accessor.reset(tuple_store_->createValueAccessor(matches.get())); + } else { + // Create a ValueAccessor that iterates over all tuples in this block + accessor.reset(tuple_store_->createValueAccessor()); + } - attribute_id attr_id = 0; - - // First, put GROUP BY keys into 'temp_result'. - if (reuse_group_by_vectors->empty()) { - // Compute GROUP BY values from group_by Scalars, and store them in - // reuse_group_by_vectors for reuse by other aggregates on this same - // block. - reuse_group_by_vectors->reserve(group_by.size()); - for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { - reuse_group_by_vectors->emplace_back( - group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); - temp_result.addColumn(reuse_group_by_vectors->back().get(), false); - key_ids.push_back(attr_id++); - } + attribute_id temp_result_attribute_id = 0; + for (const auto &argument_vec : arguments) { + CHECK_LE(argument_vec.size(), 1); + + if (argument_vec.size() == 0) { + argument_ids.emplace_back(kInvalidAttributeID); } else { - // Reuse precomputed GROUP BY values from reuse_group_by_vectors. - DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) - << "Wrong number of reuse_group_by_vectors"; - for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) { - temp_result.addColumn(reuse_cv.get(), false); - key_ids.push_back(attr_id++); + const auto &argument = argument_vec.front(); + const attribute_id argument_id = + argument->getAttributeIdForValueAccessor(); + if (argument_id != kInvalidAttributeID) { + argument_ids.emplace_back(argument_id); + } else { + temp_result.addColumn(argument->getAllValues(accessor.get(), &sub_blocks_ref)); + argument_ids.push_back(-(temp_result_attribute_id + 2)); + ++temp_result_attribute_id; } } - - // Compute argument vectors and add them to 'temp_result'. - for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) { - for (const std::unique_ptr<const Scalar> &args : argument) { - temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); - argument_ids.push_back(attr_id++); - } - if (argument.empty()) { - argument_ids.push_back(kInvalidAttributeID); - } - } } - hash_table->upsertValueAccessorCompositeKey(&temp_result, - key_ids, + hash_table->upsertValueAccessorCompositeKey(accessor.get(), + temp_result_attribute_id == 0 ? nullptr : &temp_result, + group_by_attribute_ids, argument_ids); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c41451db/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index 08d81d0..a4e8448 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -462,11 +462,9 @@ class StorageBlock : public StorageBlockBase { */ void aggregateGroupBy( const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, - const std::vector<std::unique_ptr<const Scalar>> &group_by, + const std::vector<attribute_id> &group_by_attribute_ids, const Predicate *predicate, - AggregationStateHashTableBase *hash_table, - std::unique_ptr<TupleIdSequence> *reuse_matches, - std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; + AggregationStateHashTableBase *hash_table) const; /** * @brief Perform an UPDATE query over the tuples in this StorageBlock.