Repository: incubator-quickstep Updated Branches: refs/heads/collision-free-agg 963a60428 -> 12b112f85
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/12b112f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/12b112f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/12b112f8 Branch: refs/heads/collision-free-agg Commit: 12b112f851608c7f423cb9bca780b1be0489e55d Parents: 963a604 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Tue Jan 31 23:39:41 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Tue Jan 31 23:39:41 2017 -0600 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 23 ++++---- .../aggregation/AggregationConcreteHandle.hpp | 14 ++--- expressions/aggregation/AggregationHandle.hpp | 11 ++-- .../aggregation/AggregationHandleSum.cpp | 4 +- query_optimizer/ExecutionGenerator.cpp | 2 +- .../InitializeAggregationStateOperator.cpp | 2 +- storage/AggregationOperationState.cpp | 56 +++++++++++++++++--- storage/AggregationOperationState.hpp | 3 +- .../CollisionFreeAggregationStateHashTable.cpp | 3 +- 9 files changed, 76 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index c3d133a..d28aa6e 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -48,21 +48,16 @@ AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashT } void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( + const std::vector<attribute_id> &argument_ids, + const std::vector<attribute_id> &key_attr_ids, + AggregationStateHashTableBase *distinctify_hash_table, ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const { - // If the key-value pair is already there, we don't need to update the value, - // which should always be "true". I.e. the value is just a placeholder. -// AggregationStateFastHashTable *hash_table = -// static_cast<AggregationStateFastHashTable *>(distinctify_hash_table); -// if (key_ids.size() == 1) { -// hash_table->upsertValueAccessorFast( -// key_ids, accessor, key_ids[0], true /* check_for_null_keys */); -// } else { -// std::vector<attribute_id> empty_args {kInvalidAttributeID}; -// hash_table->upsertValueAccessorCompositeKeyFast( -// empty_args, accessor, key_ids, true /* check_for_null_keys */); -// } + ColumnVectorsValueAccessor *aux_accessor) const { + std::vector<attribute_id> combined_ids(key_attr_ids); + combined_ids.insert(combined_ids.end(), argument_ids.begin(), argument_ids.end()); + + static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(distinctify_hash_table) + ->upsertValueAccessor({}, combined_ids, accessor, aux_accessor); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index 04be232..c49f597 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -110,9 +110,11 @@ class AggregationConcreteHandle : public AggregationHandle { StorageManager *storage_manager) const override; void insertValueAccessorIntoDistinctifyHashTable( + const std::vector<attribute_id> &argument_ids, + const std::vector<attribute_id> &key_attr_ids, + AggregationStateHashTableBase *distinctify_hash_table, ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const override; + ColumnVectorsValueAccessor *aux_accessor = nullptr) const override; void blockUpdate() override { block_update_ = true; @@ -127,11 +129,11 @@ class AggregationConcreteHandle : public AggregationHandle { : AggregationHandle(agg_id) {} template <typename HandleT, typename StateT> - StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table) const; template <typename HandleT, typename HashTableT> - void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + void aggregateOnDistinctifyHashTableForGroupByUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *hash_table, std::size_t index) const; @@ -200,7 +202,7 @@ class HashTableAggregateFinalizer { template <typename HandleT, typename StateT> StateT* AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + aggregateOnDistinctifyHashTableForSingleUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table) const { const HandleT &handle = static_cast<const HandleT &>(*this); StateT *state = static_cast<StateT *>(createInitialState()); @@ -226,7 +228,7 @@ StateT* AggregationConcreteHandle:: template <typename HandleT, typename HashTableT> void AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + aggregateOnDistinctifyHashTableForGroupByUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table, std::size_t index) const { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index bc9c27f..fd7f7af 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -249,11 +249,6 @@ class AggregationHandle { * @brief Inserts the GROUP BY expressions and aggregation arguments together * as keys into the distinctify hash table. * - * @param accessor The ValueAccessor that will be iterated over to read - * tuples. - * @param key_ids The attribute_ids of the GROUP BY expressions in accessor - * together with the attribute_ids of the arguments to this aggregate - * in accessor, in order. * @param distinctify_hash_table The HashTable to store the GROUP BY * expressions and the aggregation arguments together as hash table * keys and a bool constant \c true as hash table value (So the hash @@ -261,9 +256,11 @@ class AggregationHandle { * by calling createDistinctifyHashTable(); */ virtual void insertValueAccessorIntoDistinctifyHashTable( + const std::vector<attribute_id> &argument_ids, + const std::vector<attribute_id> &key_attr_ids, + AggregationStateHashTableBase *distinctify_hash_table, ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, - AggregationStateHashTableBase *distinctify_hash_table) const = 0; + ColumnVectorsValueAccessor *aux_accessor = nullptr) const = 0; /** * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 29a986f..ce36e79 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -152,7 +152,7 @@ ColumnVector* AggregationHandleSum::finalizeHashTable( AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< AggregationHandleSum, AggregationStateSum>(distinctify_hash_table); } @@ -161,7 +161,7 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table, std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< + aggregateOnDistinctifyHashTableForGroupByUnaryHelper< AggregationHandleSum, PackedPayloadSeparateChainingAggregationStateHashTable>( distinctify_hash_table, aggregation_hash_table, index); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index d32505b..76522dc 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1657,7 +1657,7 @@ void ExecutionGenerator::convertAggregate( execution_plan_->addDirectDependency(aggregation_operator_index, initialize_aggregation_state_operator_index, - true); + true /* is_pipeline_breaker */); } // Create InsertDestination proto. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/relational_operators/InitializeAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp index dfee459..b041aef 100644 --- a/relational_operators/InitializeAggregationStateOperator.cpp +++ b/relational_operators/InitializeAggregationStateOperator.cpp @@ -53,7 +53,7 @@ bool InitializeAggregationStateOperator::getAllWorkOrders( } started_ = true; } - return started_; + return true; } bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 1bc5832..6e7d2ae 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -136,6 +136,9 @@ AggregationOperationState::AggregationOperationState( std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator args_it = arguments.begin(); std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); + std::vector<HashTableImplType>::const_iterator + distinctify_hash_table_impl_types_it = + distinctify_hash_table_impl_types.begin(); for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) { // Get the Types of this aggregate's arguments so that we can create an @@ -174,13 +177,35 @@ AggregationOperationState::AggregationOperationState( // Aggregation with GROUP BY: combined payload is partially updated in // the presence of DISTINCT. if (*is_distinct_it) { - LOG(FATAL) << "Distinct aggregation not supported"; + handles_.back()->blockUpdate(); } group_by_handles.emplace_back(handles_.back().get()); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); } + + // Initialize the corresponding distinctify hash table if this is a + // DISTINCT aggregation. + if (*is_distinct_it) { + std::vector<const Type *> key_types(group_by_types_); + key_types.insert( + key_types.end(), argument_types.begin(), argument_types.end()); + // TODO(jianqiao): estimated_num_entries is quite inaccurate for + // estimating the number of entries in the distinctify hash table. + // We need to estimate for each distinct aggregation an + // estimated_num_distinct_keys value during query optimization. + distinctify_hashtables_.emplace_back( + AggregationStateHashTableFactory::CreateResizable( + *distinctify_hash_table_impl_types_it, + key_types, + estimated_num_entries, + {}, + storage_manager)); + ++distinctify_hash_table_impl_types_it; + } else { + distinctify_hashtables_.emplace_back(nullptr); + } } // Aggregation with GROUP BY: create a HashTable pool. @@ -439,14 +464,23 @@ void AggregationOperationState::aggregateBlockSingleState( const auto &argument_ids = argument_ids_[agg_idx]; const auto &handle = handles_[agg_idx]; - AggregationState *state; - if (argument_ids.empty()) { - // Special case. This is a nullary aggregate (i.e. COUNT(*)). - state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples() - : matches->size()); + AggregationState *state = nullptr; + if (is_distinct_[agg_idx]) { + handle->insertValueAccessorIntoDistinctifyHashTable( + argument_ids, + {}, + distinctify_hashtables_[agg_idx].get(), + accessor.get(), + &non_trivial_results); } else { - // Have the AggregationHandle actually do the aggregation. - state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids); + if (argument_ids.empty()) { + // Special case. This is a nullary aggregate (i.e. COUNT(*)). + state = handle->accumulateNullary(matches == nullptr ? tuple_store.numTuples() + : matches->size()); + } else { + // Have the AggregationHandle actually do the aggregation. + state = handle->accumulate(accessor.get(), &non_trivial_results, argument_ids); + } } local_state.emplace_back(state); } @@ -616,6 +650,12 @@ void AggregationOperationState::finalizeSingleState( std::vector<TypedValue> attribute_values; for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + if (is_distinct_[agg_idx]) { + single_states_[agg_idx].reset( + handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle( + *distinctify_hashtables_[agg_idx])); + } + attribute_values.emplace_back( handles_[agg_idx]->finalize(*single_states_[agg_idx])); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 44803fc..5ee675a 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -261,8 +261,7 @@ class AggregationOperationState { std::vector<const Type *> group_by_types_; // Hash table for obtaining distinct (i.e. unique) arguments. -// std::vector<std::unique_ptr<AggregationStateHashTableBase>> -// distinctify_hashtables_; + std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_; // Per-aggregate global states for aggregation without GROUP BY. std::vector<std::unique_ptr<AggregationState>> single_states_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/12b112f8/storage/CollisionFreeAggregationStateHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp index 15d4dfe..39560cc 100644 --- a/storage/CollisionFreeAggregationStateHashTable.cpp +++ b/storage/CollisionFreeAggregationStateHashTable.cpp @@ -111,7 +111,8 @@ CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable( } memory_size_ = required_memory; - num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL); + num_init_partitions_ = + std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL)); } CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() {