http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 642d88d..00b229e 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -25,8 +25,8 @@ #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -43,7 +43,8 @@ namespace quickstep { class StorageManager; AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type), block_update_(false) { + : AggregationConcreteHandle(AggregationID::kSum), + argument_type_(type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -79,47 +80,26 @@ AggregationHandleSum::AggregationHandleSum(const Type &type) result_type_ = &sum_type.getNullableVersion(); } -AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleSum::accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector<attribute_id> &argument_ids) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for SUM: " << argument_ids.size(); -AggregationState* AggregationHandleSum::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); - std::size_t num_tuples = 0; - TypedValue cv_sum = fast_operator_->accumulateColumnVector( - blank_state_.sum_, *column_vectors.front(), &num_tuples); - return new AggregationStateSum(std::move(cv_sum), num_tuples == 0); -} + const attribute_id argument_id = argument_ids.front(); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleSum::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for SUM: " << accessor_ids.size(); + ValueAccessor *target_accessor = + argument_id >= 0 ? accessor : aux_accessor; + const attribute_id target_argument_id = + argument_id >= 0 ? argument_id : -(argument_id+2); std::size_t num_tuples = 0; TypedValue va_sum = fast_operator_->accumulateValueAccessor( - blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples); + blank_state_.sum_, target_accessor, target_argument_id, &num_tuples); return new AggregationStateSum(std::move(va_sum), num_tuples == 0); } -#endif - -void AggregationHandleSum::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for SUM: " << argument_ids.size(); -} void AggregationHandleSum::mergeStates(const AggregationState &source, AggregationState *destination) const { @@ -134,8 +114,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source, sum_destination->null_ = sum_destination->null_ && sum_source.null_; } -void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleSum::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_); const bool *src_null_ptr = @@ -164,27 +144,10 @@ ColumnVector* AggregationHandleSum::finalizeHashTable( const AggregationStateHashTableBase &hash_table, std::vector<std::vector<TypedValue>> *group_by_keys, int index) const { - return finalizeHashTableHelperFast<AggregationHandleSum, - AggregationStateFastHashTable>( - *result_type_, hash_table, group_by_keys, index); -} - -AggregationState* -AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleSum, - AggregationStateSum>(distinctify_hash_table); -} - -void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + return finalizeHashTableHelper< AggregationHandleSum, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + PackedPayloadSeparateChainingAggregationStateHashTable>( + *result_type_, hash_table, group_by_keys, index); } } // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index f0d23e1..9fb7706 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -28,7 +28,6 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -41,6 +40,7 @@ namespace quickstep { class ColumnVector; +class ColumnVectorsValueAccessor; class StorageManager; class ValueAccessor; @@ -101,16 +101,18 @@ class AggregationHandleSum : public AggregationConcreteHandle { public: ~AggregationHandleSum() override {} + std::vector<const Type *> getArgumentTypes() const override { + return {&argument_type_}; + } + + const Type* getResultType() const override { + return result_type_; + } + AggregationState* createInitialState() const override { return new AggregationStateSum(blank_state_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - inline void iterateUnaryInl(AggregationStateSum *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); @@ -121,28 +123,19 @@ class AggregationHandleSum : public AggregationConcreteHandle { state->null_ = false; } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { - DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); - if (value.isNull()) return; - TypedValue *sum_ptr = - reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); - bool *null_ptr = - reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); - *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value); - *null_ptr = false; - } + AggregationState* accumulate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor, + const std::vector<attribute_id> &argument_ids) const override; - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void blockUpdate() override { block_update_ = true; } + TypedValue finalize(const AggregationState &state) const override; - void allowUpdate() override { block_update_ = false; } + std::size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *sum_ptr = @@ -161,41 +154,23 @@ class AggregationHandleSum : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - - TypedValue finalize(const AggregationState &state) const override; - - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return static_cast<const AggregationStateSum &>(state).sum_; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature())); + if (argument.isNull()) return; + TypedValue *sum_ptr = + reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_); + bool *null_ptr = + reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_); + *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, argument); + *null_ptr = false; } - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr); - TypedValue *sum_ptr = - reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_); - return *sum_ptr; + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; + + inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const { + return *reinterpret_cast<const TypedValue *>(byte_ptr + blank_state_.sum_offset_); } ColumnVector* finalizeHashTable( @@ -203,29 +178,6 @@ class AggregationHandleSum : public AggregationConcreteHandle { std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for SUM aggregation. - */ - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; - - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for SUM aggregation. - */ - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { - return blank_state_.getPayloadSize(); - } - private: friend class AggregateFunctionSum; @@ -242,8 +194,6 @@ class AggregationHandleSum : public AggregationConcreteHandle { std::unique_ptr<UncheckedBinaryOperator> fast_operator_; std::unique_ptr<UncheckedBinaryOperator> merge_operator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/AggregationID.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationID.hpp b/expressions/aggregation/AggregationID.hpp index 1efb35c..cd18d47 100644 --- a/expressions/aggregation/AggregationID.hpp +++ b/expressions/aggregation/AggregationID.hpp @@ -32,9 +32,11 @@ namespace quickstep { enum class AggregationID { kAvg = 0, kCount, + kDistinct, kMax, kMin, - kSum + kSum, + kUnknown }; /** @} */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index e9503f7..bd239d4 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -146,10 +146,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory quickstep_threading_SpinMutex quickstep_types_TypedValue quickstep_types_containers_ColumnVector @@ -157,6 +155,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl target_link_libraries(quickstep_expressions_aggregation_AggregationHandle glog quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase quickstep_types_TypedValue quickstep_utility_Macros) @@ -165,10 +164,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory @@ -183,12 +181,12 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_storage_ValueAccessor quickstep_storage_ValueAccessorUtil + quickstep_types_LongType quickstep_types_TypeFactory quickstep_types_TypeID quickstep_types_TypedValue @@ -199,8 +197,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinc glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_types_TypedValue quickstep_utility_Macros) target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax @@ -208,10 +207,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypedValue @@ -225,10 +223,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypedValue @@ -242,10 +239,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable + quickstep_expressions_aggregation_AggregationID quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_threading_SpinMutex quickstep_types_Type quickstep_types_TypeFactory http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 895c2ea..ed0f99c 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -200,20 +200,6 @@ class QueryContext { } /** - * @brief Destroy the payloads from the aggregation hash tables. - * - * @warning After calling these methods, the hash table will be in an invalid - * state. No other operation should be performed on them. - * - * @param id The ID of the AggregationOperationState. - **/ - inline void destroyAggregationHashTablePayload(const aggregation_state_id id) { - DCHECK_LT(id, aggregation_states_.size()); - DCHECK(aggregation_states_[id]); - aggregation_states_[id]->destroyAggregationHashTablePayload(); - } - - /** * @brief Whether the given GeneratorFunctionHandle id is valid. * * @param id The GeneratorFunctionHandle id. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 7f90e11..7f75264 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto + quickstep_expressions_aggregation_AggregationID quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_expressions_scalar_ScalarAttribute @@ -125,6 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator + quickstep_relationaloperators_InitializeAggregationStateOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RelationalOperator @@ -145,6 +147,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_storage_StorageBlockLayout_proto quickstep_storage_SubBlockTypeRegistry quickstep_types_Type + quickstep_types_TypeID quickstep_types_Type_proto quickstep_types_TypedValue quickstep_types_TypedValue_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index ce1452e..6694001 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -49,6 +49,7 @@ #include "expressions/Expressions.pb.h" #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunction.pb.h" +#include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" @@ -105,6 +106,7 @@ #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" #include "relational_operators/HashJoinOperator.hpp" +#include "relational_operators/InitializeAggregationStateOperator.hpp" #include "relational_operators/InsertOperator.hpp" #include "relational_operators/NestedLoopsJoinOperator.hpp" #include "relational_operators/RelationalOperator.hpp" @@ -126,6 +128,7 @@ #include "storage/SubBlockTypeRegistry.hpp" #include "types/Type.hpp" #include "types/Type.pb.h" +#include "types/TypeID.hpp" #include "types/TypedValue.hpp" #include "types/TypedValue.pb.h" #include "types/containers/Tuple.pb.h" @@ -371,6 +374,91 @@ void ExecutionGenerator::dropAllTemporaryRelations() { } } +bool ExecutionGenerator::canUseCollisionFreeAggregation( + const P::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *exact_num_groups) const { + if (aggregate->grouping_expressions().size() != 1) { + return false; + } + + E::AttributeReferencePtr group_by_key_attr; + const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front(); + if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) { + return false; + } + + bool min_value_stat_is_exact; + bool max_value_stat_is_exact; + const TypedValue min_value = + cost_model_for_aggregation_->findMinValueStat( + aggregate, group_by_key_attr, &min_value_stat_is_exact); + const TypedValue max_value = + cost_model_for_aggregation_->findMaxValueStat( + aggregate, group_by_key_attr, &max_value_stat_is_exact); + if (min_value.isNull() || max_value.isNull() || + (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) { + return false; + } + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + switch (group_by_key_attr->getValueType().getTypeID()) { + case TypeID::kInt: { + min_cpp_value = min_value.getLiteral<int>(); + max_cpp_value = max_value.getLiteral<int>(); + break; + } + case TypeID::kLong: { + min_cpp_value = min_value.getLiteral<std::int64_t>(); + max_cpp_value = max_value.getLiteral<std::int64_t>(); + break; + } + default: + return false; + } + + // TODO + if (min_cpp_value < 0 || + max_cpp_value > 1000000000 || + max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) { + return false; + } + + + for (const auto &agg_expr : aggregate->aggregate_expressions()) { + const E::AggregateFunctionPtr agg_func = + std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression()); + switch (agg_func->getAggregate().getAggregationID()) { + case AggregationID::kCount: // Fall through + case AggregationID::kSum: + break; + default: + return false; + } + + const auto &arguments = agg_func->getArguments(); + if (arguments.size() > 1) { + return false; + } + + if (arguments.size() == 1) { + switch (arguments.front()->getValueType().getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + case TypeID::kFloat: + case TypeID::kDouble: + break; + default: + return false; + } + } + } + + *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; + return true; +} + void ExecutionGenerator::convertNamedExpressions( const std::vector<E::NamedExpressionPtr> &named_expressions, S::QueryContext::ScalarGroup *scalar_group_proto) { @@ -1454,6 +1542,8 @@ void ExecutionGenerator::convertAggregate( findRelationInfoOutputByPhysical(physical_plan->input()); aggr_state_proto->set_relation_id(input_relation_info->relation->getID()); + bool use_parallel_initialization = false; + std::vector<const Type*> group_by_types; for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { unique_ptr<const Scalar> execution_group_by_expression; @@ -1474,9 +1564,34 @@ void ExecutionGenerator::convertAggregate( } if (!group_by_types.empty()) { - // Right now, only SeparateChaining is supported. - aggr_state_proto->set_hash_table_impl_type( - serialization::HashTableImplType::SEPARATE_CHAINING); + const std::size_t estimated_num_groups = + cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); + + std::size_t exact_num_groups; + const bool can_use_collision_free_aggregation = + canUseCollisionFreeAggregation(physical_plan, + estimated_num_groups, + &exact_num_groups); + + if (can_use_collision_free_aggregation) { + aggr_state_proto->set_hash_table_impl_type( + serialization::HashTableImplType::COLLISION_FREE_VECTOR); + std::cout << "Use collision free aggregation!\n" + << "Size = " << exact_num_groups << "\n"; + + aggr_state_proto->set_estimated_num_entries(exact_num_groups); + use_parallel_initialization = true; + } else { + // Otherwise, use SeparateChaining. + aggr_state_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SEPARATE_CHAINING); + std::cout << "Use normal aggregation\n" + << "Size = " << estimated_num_groups << "\n"; + + aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); + } + } else { + aggr_state_proto->set_estimated_num_entries(1uL); } for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { @@ -1514,10 +1629,6 @@ void ExecutionGenerator::convertAggregate( aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto()); } - const std::size_t estimated_num_groups = - cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); - aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); - const QueryPlan::DAGNodeIndex aggregation_operator_index = execution_plan_->addRelationalOperator( new AggregationOperator( @@ -1532,6 +1643,18 @@ void ExecutionGenerator::convertAggregate( false /* is_pipeline_breaker */); } + if (use_parallel_initialization) { + const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index = + execution_plan_->addRelationalOperator( + new InitializeAggregationStateOperator( + query_handle_->query_id(), + aggr_state_index)); + + execution_plan_->addDirectDependency(aggregation_operator_index, + initialize_aggregation_state_operator_index, + true); + } + // Create InsertDestination proto. const CatalogRelation *output_relation = nullptr; const QueryContext::insert_destination_id insert_destination_index = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index eba6eee..b52fe97 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_ #define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_ +#include <cstddef> #include <memory> #include <string> #include <unordered_map> @@ -37,6 +38,7 @@ #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/cost_model/CostModel.hpp" +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" #include "query_optimizer/expressions/Predicate.hpp" @@ -203,6 +205,10 @@ class ExecutionGenerator { */ std::string getNewRelationName(); + bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *exact_num_groups) const; + /** * @brief Sets up the info of the CatalogRelation represented by TableReference. * TableReference is not converted to any operator. @@ -427,7 +433,7 @@ class ExecutionGenerator { /** * @brief The cost model to use for estimating aggregation hash table size. */ - std::unique_ptr<cost::CostModel> cost_model_for_aggregation_; + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_; /** * @brief The cost model to use for estimating join hash table size. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index c18dc77..bd20059 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -47,6 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator FinalizeAggregationOperator.cpp FinalizeAggregationOperator.hpp) add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp) +add_library(quickstep_relationaloperators_InitializeAggregationStateOperator + InitializeAggregationStateOperator.cpp + InitializeAggregationStateOperator.hpp) add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp) add_library(quickstep_relationaloperators_NestedLoopsJoinOperator NestedLoopsJoinOperator.cpp @@ -254,6 +257,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_utility_lipfilter_LIPFilterAdaptiveProber quickstep_utility_lipfilter_LIPFilterUtil tmb) +target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator + glog + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_AggregationOperationState + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_relationaloperators_InsertOperator glog quickstep_catalog_CatalogRelation @@ -548,6 +562,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator + quickstep_relationaloperators_InitializeAggregationStateOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RebuildWorkOrder http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/DestroyAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp index 49be43d..62ca9e7 100644 --- a/relational_operators/DestroyAggregationStateOperator.cpp +++ b/relational_operators/DestroyAggregationStateOperator.cpp @@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta } void DestroyAggregationStateWorkOrder::execute() { - // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate - // from the destroyAggregationState call. The reason is that the aggregation - // hash tables don't own the AggregationHandle objects. However the hash table - // class requires the handles for destroying the payload (see the - // destroyPayload methods in AggregationHandle classes). Therefore, we first - // destroy the payloads in the hash table and then destroy the hash table. - query_context_->destroyAggregationHashTablePayload(aggr_state_index_); query_context_->destroyAggregationState(aggr_state_index_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 0cbf635..b66030b 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders( AggregationOperationState *agg_state = query_context->getAggregationState(aggr_state_index_); DCHECK(agg_state != nullptr); - for (int part_id = 0; - part_id < static_cast<int>(agg_state->getNumPartitions()); - ++part_id) { + for (std::size_t partition_id = 0; + partition_id < agg_state->getNumPartitions(); + ++partition_id) { container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( query_id_, + partition_id, agg_state, - query_context->getInsertDestination(output_destination_index_), - part_id), + query_context->getInsertDestination(output_destination_index_)), op_index_); } } @@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer } void FinalizeAggregationWorkOrder::execute() { - if (state_->isAggregatePartitioned()) { - state_->finalizeAggregatePartitioned(part_id_, output_destination_); - } else { - state_->finalizeAggregate(output_destination_); - } + state_->finalizeAggregate(partition_id_, output_destination_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index ae7127a..3c209b1 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder { * @note InsertWorkOrder takes ownership of \c state. * * @param query_id The ID of the query to which this operator belongs. + * @param partition_id The partition ID for which the Finalize aggregation + * work order is issued. * @param state The AggregationState to use. * @param output_destination The InsertDestination to insert aggregation * results. - * @param part_id The partition ID for which the Finalize aggregation work - * order is issued. Ignore if aggregation is not partitioned. */ FinalizeAggregationWorkOrder(const std::size_t query_id, + const std::size_t partition_id, AggregationOperationState *state, - InsertDestination *output_destination, - const int part_id = -1) + InsertDestination *output_destination) : WorkOrder(query_id), + partition_id_(partition_id), state_(DCHECK_NOTNULL(state)), - output_destination_(DCHECK_NOTNULL(output_destination)), - part_id_(part_id) {} + output_destination_(DCHECK_NOTNULL(output_destination)) {} ~FinalizeAggregationWorkOrder() override {} void execute() override; private: + const std::size_t partition_id_; AggregationOperationState *state_; InsertDestination *output_destination_; - const int part_id_; DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/InitializeAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp new file mode 100644 index 0000000..dfee459 --- /dev/null +++ b/relational_operators/InitializeAggregationStateOperator.cpp @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/InitializeAggregationStateOperator.hpp" + +#include <vector> + +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/AggregationOperationState.hpp" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool InitializeAggregationStateOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (!started_) { + AggregationOperationState *agg_state = + query_context->getAggregationState(aggr_state_index_); + DCHECK(agg_state != nullptr); + + for (std::size_t part_id = 0; + part_id < agg_state->getNumInitializationPartitions(); + ++part_id) { + container->addNormalWorkOrder( + new InitializeAggregationStateWorkOrder(query_id_, + part_id, + agg_state), + op_index_); + } + started_ = true; + } + return started_; +} + +bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + // TODO + LOG(FATAL) << "Not implemented"; +} + +void InitializeAggregationStateWorkOrder::execute() { + state_->initializeState(partition_id_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/relational_operators/InitializeAggregationStateOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationStateOperator.hpp b/relational_operators/InitializeAggregationStateOperator.hpp new file mode 100644 index 0000000..10403b3 --- /dev/null +++ b/relational_operators/InitializeAggregationStateOperator.hpp @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ + +#include <string> + +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class AggregationOperationState; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +class InitializeAggregationStateOperator : public RelationalOperator { + public: + InitializeAggregationStateOperator(const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + aggr_state_index_(aggr_state_index), + started_(false) {} + + ~InitializeAggregationStateOperator() override {} + + std::string getName() const override { + return "InitializeAggregationStateOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + bool started_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator); +}; + +class InitializeAggregationStateWorkOrder : public WorkOrder { + public: + InitializeAggregationStateWorkOrder(const std::size_t query_id, + const std::size_t partition_id, + AggregationOperationState *state) + : WorkOrder(query_id), + partition_id_(partition_id), + state_(DCHECK_NOTNULL(state)) {} + + ~InitializeAggregationStateWorkOrder() override {} + + void execute() override; + + private: + const std::size_t partition_id_; + + AggregationOperationState *state_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index b942c1b..5de2653 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -39,15 +39,17 @@ #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "storage/AggregationOperationState.pb.h" -#include "storage/HashTable.hpp" -#include "storage/HashTableBase.hpp" #include "storage/HashTableFactory.hpp" +#include "storage/HashTableBase.hpp" #include "storage/InsertDestination.hpp" +#include "storage/PackedPayloadAggregationStateHashTable.hpp" #include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" +#include "storage/SubBlocksReference.hpp" #include "storage/TupleIdSequence.hpp" #include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" @@ -80,50 +82,63 @@ AggregationOperationState::AggregationOperationState( const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), - is_aggregate_partitioned_(checkAggregatePartitioned( - estimated_num_entries, is_distinct, group_by, aggregate_functions)), + is_aggregate_collision_free_(false), + is_aggregate_partitioned_(false), predicate_(predicate), - group_by_list_(std::move(group_by)), - arguments_(std::move(arguments)), is_distinct_(std::move(is_distinct)), storage_manager_(storage_manager) { + if (!group_by.empty()) { + if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) { + is_aggregate_collision_free_ = true; + } else { + is_aggregate_partitioned_ = checkAggregatePartitioned( + estimated_num_entries, is_distinct_, group_by, aggregate_functions); + } + } + // Sanity checks: each aggregate has a corresponding list of arguments. - DCHECK(aggregate_functions.size() == arguments_.size()); + DCHECK(aggregate_functions.size() == arguments.size()); // Get the types of GROUP BY expressions for creating HashTables below. - std::vector<const Type *> group_by_types; - for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { - group_by_types.emplace_back(&group_by_element->getType()); + for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { + group_by_types_.emplace_back(&group_by_element->getType()); } - std::vector<AggregationHandle *> group_by_handles; - group_by_handles.clear(); + // Prepare group-by element attribute ids and non-trivial expressions. + for (std::unique_ptr<const Scalar> &group_by_element : group_by) { + const attribute_id attr_id = + group_by_element->getAttributeIdForValueAccessor(); + if (attr_id == kInvalidAttributeID) { + const attribute_id non_trivial_attr_id = + -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2); + non_trivial_expressions_.emplace_back(group_by_element.release()); + group_by_key_ids_.emplace_back(non_trivial_attr_id); + } else { + group_by_key_ids_.emplace_back(attr_id); + } + } if (aggregate_functions.size() == 0) { // If there is no aggregation function, then it is a distinctify operation // on the group-by expressions. - DCHECK_GT(group_by_list_.size(), 0u); + DCHECK_GT(group_by_key_ids_.size(), 0u); handles_.emplace_back(new AggregationHandleDistinct()); - arguments_.push_back({}); is_distinct_.emplace_back(false); group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, hash_table_impl_type, - group_by_types, - {1}, - handles_, + group_by_types_, + {handles_.front().get()}, storage_manager)); } else { + std::vector<AggregationHandle *> group_by_handles; + // Set up each individual aggregate in this operation. std::vector<const AggregateFunction *>::const_iterator agg_func_it = aggregate_functions.begin(); - std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator - args_it = arguments_.begin(); + std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator + args_it = arguments.begin(); std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); - std::vector<HashTableImplType>::const_iterator - distinctify_hash_table_impl_types_it = - distinctify_hash_table_impl_types.begin(); - std::vector<std::size_t> payload_sizes; for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) { // Get the Types of this aggregate's arguments so that we can create an @@ -133,6 +148,22 @@ AggregationOperationState::AggregationOperationState( argument_types.emplace_back(&argument->getType()); } + // Prepare argument attribute ids and non-trivial expressions. + std::vector<attribute_id> argument_ids; + for (std::unique_ptr<const Scalar> &argument : *args_it) { + const attribute_id attr_id = + argument->getAttributeIdForValueAccessor(); + if (attr_id == kInvalidAttributeID) { + const attribute_id non_trivial_attr_id = + -(static_cast<attribute_id>(non_trivial_expressions_.size()) + 2); + non_trivial_expressions_.emplace_back(argument.release()); + argument_ids.emplace_back(non_trivial_attr_id); + } else { + argument_ids.emplace_back(attr_id); + } + } + argument_ids_.emplace_back(std::move(argument_ids)); + // Sanity checks: aggregate function exists and can apply to the specified // arguments. DCHECK(*agg_func_it != nullptr); @@ -142,85 +173,43 @@ AggregationOperationState::AggregationOperationState( // to do actual aggregate computation. handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); - if (!group_by_list_.empty()) { + if (!group_by_key_ids_.empty()) { // Aggregation with GROUP BY: combined payload is partially updated in // the presence of DISTINCT. if (*is_distinct_it) { - handles_.back()->blockUpdate(); + LOG(FATAL) << "Distinct aggregation not supported"; } - group_by_handles.emplace_back(handles_.back()); - payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); + group_by_handles.emplace_back(handles_.back().get()); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // See if all of this aggregate's arguments are attributes in the input - // relation. If so, remember the attribute IDs so that we can do copy - // elision when actually performing the aggregation. - std::vector<attribute_id> local_arguments_as_attributes; - local_arguments_as_attributes.reserve(args_it->size()); - for (const std::unique_ptr<const Scalar> &argument : *args_it) { - const attribute_id argument_id = - argument->getAttributeIdForValueAccessor(); - if (argument_id == -1) { - local_arguments_as_attributes.clear(); - break; - } else { - DCHECK_EQ(input_relation_.getID(), - argument->getRelationIdForValueAccessor()); - local_arguments_as_attributes.push_back(argument_id); - } - } - - arguments_as_attributes_.emplace_back( - std::move(local_arguments_as_attributes)); -#endif } + } - // Initialize the corresponding distinctify hash table if this is a - // DISTINCT aggregation. - if (*is_distinct_it) { - std::vector<const Type *> key_types(group_by_types); - key_types.insert( - key_types.end(), argument_types.begin(), argument_types.end()); - // TODO(jianqiao): estimated_num_entries is quite inaccurate for - // estimating the number of entries in the distinctify hash table. - // We may estimate for each distinct aggregation an - // estimated_num_distinct_keys value during query optimization, if it's - // worth. - distinctify_hashtables_.emplace_back( - AggregationStateFastHashTableFactory::CreateResizable( - *distinctify_hash_table_impl_types_it, - key_types, + // Aggregation with GROUP BY: create a HashTable pool. + if (!group_by_key_ids_.empty()) { + if (is_aggregate_collision_free_) { + collision_free_hashtable_.reset( + AggregationStateHashTableFactory::CreateResizable( + hash_table_impl_type, + group_by_types_, estimated_num_entries, - {0}, - {}, + group_by_handles, storage_manager)); - ++distinctify_hash_table_impl_types_it; - } else { - distinctify_hashtables_.emplace_back(nullptr); - } - } - - if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool. - if (!is_aggregate_partitioned_) { - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager)); - } else { + } else if (is_aggregate_partitioned_) { partitioned_group_by_hashtable_pool_.reset( new PartitionedHashTablePool(estimated_num_entries, FLAGS_num_aggregation_partitions, hash_table_impl_type, - group_by_types, - payload_sizes, + group_by_types_, group_by_handles, storage_manager)); + } else { + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); } } } @@ -269,7 +258,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( proto.group_by_expressions(group_by_idx), database)); } - unique_ptr<Predicate> predicate; + std::unique_ptr<Predicate> predicate; if (proto.has_predicate()) { predicate.reset( PredicateFactory::ReconstructFromProto(proto.predicate(), database)); @@ -353,33 +342,72 @@ bool AggregationOperationState::ProtoIsValid( return true; } -void AggregationOperationState::aggregateBlock(const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { - if (group_by_list_.empty()) { - aggregateBlockSingleState(input_block); +std::size_t AggregationOperationState::getNumPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast<CollisionFreeAggregationStateHashTable *>( + collision_free_hashtable_.get())->getNumFinalizationPartitions(); + } else if (is_aggregate_partitioned_) { + return partitioned_group_by_hashtable_pool_->getNumPartitions(); + } else { + return 1u; + } +} + +std::size_t AggregationOperationState::getNumInitializationPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast<CollisionFreeAggregationStateHashTable *>( + collision_free_hashtable_.get())->getNumInitializationPartitions(); } else { - aggregateBlockHashTable(input_block, lip_filter_adaptive_prober); + return 0u; } } -void AggregationOperationState::finalizeAggregate( - InsertDestination *output_destination) { - if (group_by_list_.empty()) { - finalizeSingleState(output_destination); +void AggregationOperationState::initializeState(const std::size_t partition_id) { + if (is_aggregate_collision_free_) { + static_cast<CollisionFreeAggregationStateHashTable *>( + collision_free_hashtable_.get())->initialize(partition_id); } else { - finalizeHashTable(output_destination); + LOG(FATAL) << "AggregationOperationState::initializeState() " + << "is not supported by this aggregation"; } } -void AggregationOperationState::mergeSingleState( - const std::vector<std::unique_ptr<AggregationState>> &local_state) { - DEBUG_ASSERT(local_state.size() == single_states_.size()); - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (!is_distinct_[agg_idx]) { - handles_[agg_idx]->mergeStates(*local_state[agg_idx], - single_states_[agg_idx].get()); +bool AggregationOperationState::checkAggregatePartitioned( + const std::size_t estimated_num_groups, + const std::vector<bool> &is_distinct, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const std::vector<const AggregateFunction *> &aggregate_functions) const { + // If there's no aggregation, return false. + if (aggregate_functions.empty()) { + return false; + } + // Check if there's a distinct operation involved in any aggregate, if so + // the aggregate can't be partitioned. + for (auto distinct : is_distinct) { + if (distinct) { + return false; } } + // There's no distinct aggregation involved, Check if there's at least one + // GROUP BY operation. + if (group_by.empty()) { + return false; + } + // There are GROUP BYs without DISTINCT. Check if the estimated number of + // groups is large enough to warrant a partitioned aggregation. + return estimated_num_groups > + static_cast<std::size_t>( + FLAGS_partition_aggregation_num_groups_threshold); + return false; +} + +void AggregationOperationState::aggregateBlock(const block_id input_block, + LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { + if (group_by_key_ids_.empty()) { + aggregateBlockSingleState(input_block); + } else { + aggregateBlockHashTable(input_block, lip_filter_adaptive_prober); + } } void AggregationOperationState::aggregateBlockSingleState( @@ -392,114 +420,137 @@ void AggregationOperationState::aggregateBlockSingleState( std::unique_ptr<TupleIdSequence> matches; if (predicate_ != nullptr) { - std::unique_ptr<ValueAccessor> accessor( - block->getTupleStorageSubBlock().createValueAccessor()); matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get())); } - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - const std::vector<attribute_id> *local_arguments_as_attributes = nullptr; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all arguments are attributes of the input relation, elide a copy. - if (!arguments_as_attributes_[agg_idx].empty()) { - local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]); + const auto &tuple_store = block->getTupleStorageSubBlock(); + std::unique_ptr<ValueAccessor> accessor( + tuple_store.createValueAccessor(matches.get())); + + ColumnVectorsValueAccessor non_trivial_results; + if (!non_trivial_expressions_.empty()) { + SubBlocksReference sub_blocks_ref(tuple_store, + block->getIndices(), + block->getIndicesConsistent()); + for (const auto &expression : non_trivial_expressions_) { + non_trivial_results.addColumn( + expression->getAllValues(accessor.get(), &sub_blocks_ref)); } -#endif - if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to put the arguments as keys - // directly into the (threadsafe) shared global distinctify HashTable - // for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - {}, /* group_by */ - matches.get(), - distinctify_hashtables_[agg_idx].get(), - nullptr /* reuse_group_by_vectors */); - local_state.emplace_back(nullptr); + } + + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + const auto &argument_ids = argument_ids_[agg_idx]; + const auto &handle = handles_[agg_idx]; + + AggregationState *state; + if (argument_ids.empty()) { + // Special case. This is a nullary aggregate (i.e. COUNT(*)). + state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples() + : matches->size()); } else { - // Call StorageBlock::aggregate() to actually do the aggregation. - local_state.emplace_back(block->aggregate(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - matches.get())); + // Have the AggregationHandle actually do the aggregation. + state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids); } + local_state.emplace_back(state); } // Merge per-block aggregation states back with global state. mergeSingleState(local_state); } +void AggregationOperationState::mergeSingleState( + const std::vector<std::unique_ptr<AggregationState>> &local_state) { + DEBUG_ASSERT(local_state.size() == single_states_.size()); + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + if (!is_distinct_[agg_idx]) { + handles_[agg_idx]->mergeStates(*local_state[agg_idx], + single_states_[agg_idx].get()); + } + } +} + +void AggregationOperationState::mergeGroupByHashTables( + AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const { + HashTableMergerFast merger(dst); + static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src) + ->forEach(&merger); +} + void AggregationOperationState::aggregateBlockHashTable( const block_id input_block, LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { BlockReference block( storage_manager_->getBlock(input_block, input_relation_)); + const auto &tuple_store = block->getTupleStorageSubBlock(); + std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor()); + std::unique_ptr<ValueAccessor> shared_accessor; + ValueAccessor *accessor = base_accessor.get(); // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence // as the existence map for the tuples. std::unique_ptr<TupleIdSequence> matches; if (predicate_ != nullptr) { matches.reset(block->getMatchesForPredicate(predicate_.get())); + shared_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + accessor = shared_accessor.get(); } if (lip_filter_adaptive_prober != nullptr) { - std::unique_ptr<ValueAccessor> accessor( - block->getTupleStorageSubBlock().createValueAccessor(matches.get())); - matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get())); + matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor)); + shared_accessor.reset( + base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches)); + accessor = shared_accessor.get(); } - // This holds values of all the GROUP BY attributes so that the can be reused - // across multiple aggregates (i.e. we only pay the cost of evaluatin the - // GROUP BY expressions once). - std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; - - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY - // expression - // values and the aggregation arguments together as keys directly into the - // (threadsafe) shared global distinctify HashTable for this aggregate. - block->aggregateDistinct(*handles_[agg_idx], - arguments_[agg_idx], - nullptr, /* arguments_as_attributes */ - group_by_list_, - matches.get(), - distinctify_hashtables_[agg_idx].get(), - &reuse_group_by_vectors); + std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results; + if (!non_trivial_expressions_.empty()) { + non_trivial_results.reset(new ColumnVectorsValueAccessor()); + SubBlocksReference sub_blocks_ref(tuple_store, + block->getIndices(), + block->getIndicesConsistent()); + for (const auto &expression : non_trivial_expressions_) { + non_trivial_results->addColumn( + expression->getAllValues(accessor, &sub_blocks_ref)); } } - if (!is_aggregate_partitioned_) { - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pool_ != nullptr); - AggregationStateHashTableBase *agg_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(arguments_, - group_by_list_, - matches.get(), - agg_hash_table, - &reuse_group_by_vectors); - group_by_hashtable_pool_->returnHashTable(agg_hash_table); + accessor->beginIterationVirtual(); + + // TODO + if (is_aggregate_collision_free_) { + aggregateBlockHashTableImplCollisionFree( + accessor, non_trivial_results.get()); + } else if (is_aggregate_partitioned_) { + aggregateBlockHashTableImplPartitioned( + accessor, non_trivial_results.get()); } else { - ColumnVectorsValueAccessor temp_result; - // IDs of 'arguments' as attributes in the ValueAccessor we create below. - std::vector<attribute_id> argument_ids; + aggregateBlockHashTableImplThreadPrivate( + accessor, non_trivial_results.get()); + } +} - // IDs of GROUP BY key element(s) in the ValueAccessor we create below. - std::vector<attribute_id> key_ids; +void AggregationOperationState::aggregateBlockHashTableImplCollisionFree( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor) { + DCHECK(collision_free_hashtable_ != nullptr); + + collision_free_hashtable_->upsertValueAccessor(argument_ids_, + group_by_key_ids_, + accessor, + aux_accessor); +} + +void AggregationOperationState::aggregateBlockHashTableImplPartitioned( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor) { + DCHECK(partitioned_group_by_hashtable_pool_ != nullptr); + + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + // TODO(jianqiao): handle the situation when keys in non_trivial_results const std::size_t num_partitions = partitioned_group_by_hashtable_pool_->getNumPartitions(); - block->aggregateGroupByPartitioned( - arguments_, - group_by_list_, - matches.get(), - num_partitions, - &temp_result, - &argument_ids, - &key_ids, - &reuse_group_by_vectors); + // Compute the partitions for the tuple formed by group by values. std::vector<std::unique_ptr<TupleIdSequence>> partition_membership; partition_membership.resize(num_partitions); @@ -507,32 +558,57 @@ void AggregationOperationState::aggregateBlockHashTable( // Create a tuple-id sequence for each partition. for (std::size_t partition = 0; partition < num_partitions; ++partition) { partition_membership[partition].reset( - new TupleIdSequence(temp_result.getEndPosition())); + new TupleIdSequence(accessor->getEndPosition())); } // Iterate over ValueAccessor for each tuple, // set a bit in the appropriate TupleIdSequence. - temp_result.beginIteration(); - while (temp_result.next()) { + while (accessor->next()) { // We need a unique_ptr because getTupleWithAttributes() uses "new". - std::unique_ptr<Tuple> curr_tuple(temp_result.getTupleWithAttributes(key_ids)); + std::unique_ptr<Tuple> curr_tuple( + accessor->getTupleWithAttributes(group_by_key_ids_)); const std::size_t curr_tuple_partition_id = curr_tuple->getTupleHash() % num_partitions; partition_membership[curr_tuple_partition_id]->set( - temp_result.getCurrentPosition(), true); + accessor->getCurrentPosition(), true); } - // For each partition, create an adapter around Value Accessor and - // TupleIdSequence. - std::vector<std::unique_ptr< - TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter; - adapter.resize(num_partitions); + // Aggregate each partition. for (std::size_t partition = 0; partition < num_partitions; ++partition) { - adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter( - *(partition_membership)[partition])); + std::unique_ptr<ValueAccessor> adapter( + accessor->createSharedTupleIdSequenceAdapter( + *(partition_membership)[partition])); partitioned_group_by_hashtable_pool_->getHashTable(partition) - ->upsertValueAccessorCompositeKeyFast( - argument_ids, adapter[partition].get(), key_ids, true); + ->upsertValueAccessor(argument_ids_, + group_by_key_ids_, + adapter.get(), + aux_accessor); } + }); +} + +void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate( + ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor) { + DCHECK(group_by_hashtable_pool_ != nullptr); + + AggregationStateHashTableBase *agg_hash_table = + group_by_hashtable_pool_->getHashTable(); + + agg_hash_table->upsertValueAccessor(argument_ids_, + group_by_key_ids_, + accessor, + aux_accessor); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); +} + +void AggregationOperationState::finalizeAggregate( + const std::size_t partition_id, + InsertDestination *output_destination) { + if (group_by_key_ids_.empty()) { + DCHECK_EQ(0u, partition_id); + finalizeSingleState(output_destination); + } else { + finalizeHashTable(partition_id, output_destination); } } @@ -543,12 +619,6 @@ void AggregationOperationState::finalizeSingleState( std::vector<TypedValue> attribute_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - single_states_[agg_idx].reset( - handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle( - *distinctify_hashtables_[agg_idx])); - } - attribute_values.emplace_back( handles_[agg_idx]->finalize(*single_states_[agg_idx])); } @@ -556,80 +626,79 @@ void AggregationOperationState::finalizeSingleState( output_destination->insertTuple(Tuple(std::move(attribute_values))); } -void AggregationOperationState::mergeGroupByHashTables( - AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) { - HashTableMergerFast merger(dst); - (static_cast<FastHashTable<true, false, true, false> *>(src)) - ->forEachCompositeKeyFast(&merger); +void AggregationOperationState::finalizeHashTable( + const std::size_t partition_id, + InsertDestination *output_destination) { + if (is_aggregate_collision_free_) { + finalizeHashTableImplCollisionFree(partition_id, output_destination); + } else if (is_aggregate_partitioned_) { + finalizeHashTableImplPartitioned(partition_id, output_destination); + } else { + DCHECK_EQ(0u, partition_id); + finalizeHashTableImplThreadPrivate(output_destination); + } } -void AggregationOperationState::finalizeHashTable( +void AggregationOperationState::finalizeHashTableImplCollisionFree( + const std::size_t partition_id, InsertDestination *output_destination) { - // Each element of 'group_by_keys' is a vector of values for a particular - // group (which is also the prefix of the finalized Tuple for that group). - std::vector<std::vector<TypedValue>> group_by_keys; + std::vector<std::unique_ptr<ColumnVector>> final_values; + CollisionFreeAggregationStateHashTable *hash_table = + static_cast<CollisionFreeAggregationStateHashTable *>( + collision_free_hashtable_.get()); - // TODO(harshad) - The merge phase may be slower when each hash table contains - // large number of entries. We should find ways in which we can perform a - // parallel merge. + // TODO + const std::size_t max_length = + hash_table->getNumTuplesInPartition(partition_id); + ColumnVectorsValueAccessor complete_result; - // TODO(harshad) - Find heuristics for faster merge, even in a single thread. - // e.g. Keep merging entries from smaller hash tables to larger. + DCHECK_EQ(1u, group_by_types_.size()); + const Type *key_type = group_by_types_.front(); + DCHECK(NativeColumnVector::UsableForType(*key_type)); - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - if (hash_tables->size() > 1) { - for (int hash_table_index = 0; - hash_table_index < static_cast<int>(hash_tables->size() - 1); - ++hash_table_index) { - // Merge each hash table to the last hash table. - mergeGroupByHashTables((*hash_tables)[hash_table_index].get(), - hash_tables->back().get()); + std::unique_ptr<NativeColumnVector> key_cv( + new NativeColumnVector(*key_type, max_length)); + hash_table->finalizeKey(partition_id, key_cv.get()); + complete_result.addColumn(key_cv.release()); + + for (std::size_t i = 0; i < handles_.size(); ++i) { + if (handles_[i]->getAggregationID() == AggregationID::kDistinct) { + DCHECK_EQ(1u, handles_.size()); + break; } + + const Type *result_type = handles_[i]->getResultType(); + DCHECK(NativeColumnVector::UsableForType(*result_type)); + + std::unique_ptr<NativeColumnVector> result_cv( + new NativeColumnVector(*result_type, max_length)); + hash_table->finalizeState(partition_id, i, result_cv.get()); + complete_result.addColumn(result_cv.release()); } + // Bulk-insert the complete result. + output_destination->bulkInsertTuples(&complete_result); +} + +void AggregationOperationState::finalizeHashTableImplPartitioned( + const std::size_t partition_id, + InsertDestination *output_destination) { + // Each element of 'group_by_keys' is a vector of values for a particular + // group (which is also the prefix of the finalized Tuple for that group). + std::vector<std::vector<TypedValue>> group_by_keys; + // Collect per-aggregate finalized values. std::vector<std::unique_ptr<ColumnVector>> final_values; + AggregationStateHashTableBase *hash_table = + partitioned_group_by_hashtable_pool_->getHashTable(partition_id); for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pool_ != nullptr); - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - DCHECK(hash_tables->back() != nullptr); - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); - handles_[agg_idx]->allowUpdate(); - handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( - *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); - } - - auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); - DCHECK(hash_tables != nullptr); - if (hash_tables->empty()) { - // We may have a case where hash_tables is empty, e.g. no input blocks. - // However for aggregateOnDistinctifyHashTableForGroupBy to work - // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = - group_by_hashtable_pool_->getHashTableFast(); - group_by_hashtable_pool_->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); - DCHECK(agg_hash_table != nullptr); ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *agg_hash_table, &group_by_keys, agg_idx); + *hash_table, &group_by_keys, agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } } + hash_table->destroyPayload(); // Reorganize 'group_by_keys' in column-major order so that we can make a // ColumnVectorsValueAccessor to bulk-insert results. @@ -640,23 +709,20 @@ void AggregationOperationState::finalizeHashTable( // in a single HashTable. std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; std::size_t group_by_element_idx = 0; - for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { - const Type &group_by_type = group_by_element->getType(); - if (NativeColumnVector::UsableForType(group_by_type)) { + for (const Type *group_by_type : group_by_types_) { + if (NativeColumnVector::UsableForType(*group_by_type)) { NativeColumnVector *element_cv = - new NativeColumnVector(group_by_type, group_by_keys.size()); + new NativeColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); } } else { IndirectColumnVector *element_cv = - new IndirectColumnVector(group_by_type, group_by_keys.size()); + new IndirectColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue( - std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx; @@ -676,42 +742,44 @@ void AggregationOperationState::finalizeHashTable( output_destination->bulkInsertTuples(&complete_result); } -void AggregationOperationState::destroyAggregationHashTablePayload() { - std::vector<std::unique_ptr<AggregationStateHashTableBase>> *all_hash_tables = - nullptr; - if (!is_aggregate_partitioned_) { - if (group_by_hashtable_pool_ != nullptr) { - all_hash_tables = group_by_hashtable_pool_->getAllHashTables(); - } - } else { - if (partitioned_group_by_hashtable_pool_ != nullptr) { - all_hash_tables = partitioned_group_by_hashtable_pool_->getAllHashTables(); - } - } - if (all_hash_tables != nullptr) { - for (std::size_t ht_index = 0; ht_index < all_hash_tables->size(); ++ht_index) { - (*all_hash_tables)[ht_index]->destroyPayload(); - } - } -} - -void AggregationOperationState::finalizeAggregatePartitioned( - const std::size_t partition_id, InsertDestination *output_destination) { +void AggregationOperationState::finalizeHashTableImplThreadPrivate( + InsertDestination *output_destination) { // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector<std::vector<TypedValue>> group_by_keys; + // TODO(harshad) - The merge phase may be slower when each hash table contains + // large number of entries. We should find ways in which we can perform a + // parallel merge. + + // TODO(harshad) - Find heuristics for faster merge, even in a single thread. + // e.g. Keep merging entries from smaller hash tables to larger. + + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); + DCHECK(hash_tables != nullptr); + if (hash_tables->empty()) { + return; + } + + std::unique_ptr<AggregationStateHashTableBase> final_hash_table( + hash_tables->back().release()); + for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { + std::unique_ptr<AggregationStateHashTableBase> hash_table( + hash_tables->at(i).release()); + mergeGroupByHashTables(hash_table.get(), final_hash_table.get()); + hash_table->destroyPayload(); + } + // Collect per-aggregate finalized values. std::vector<std::unique_ptr<ColumnVector>> final_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - AggregationStateHashTableBase *hash_table = - partitioned_group_by_hashtable_pool_->getHashTable(partition_id); ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *hash_table, &group_by_keys, agg_idx); + *final_hash_table, &group_by_keys, agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } } + final_hash_table->destroyPayload(); // Reorganize 'group_by_keys' in column-major order so that we can make a // ColumnVectorsValueAccessor to bulk-insert results. @@ -722,19 +790,22 @@ void AggregationOperationState::finalizeAggregatePartitioned( // in a single HashTable. std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; std::size_t group_by_element_idx = 0; - for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { - const Type &group_by_type = group_by_element->getType(); - if (NativeColumnVector::UsableForType(group_by_type)) { - NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + for (const Type *group_by_type : group_by_types_) { + if (NativeColumnVector::UsableForType(*group_by_type)) { + NativeColumnVector *element_cv = + new NativeColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } else { - IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + IndirectColumnVector *element_cv = + new IndirectColumnVector(*group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx;