http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 3f6e23a..073b813 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -59,7 +59,7 @@ namespace quickstep { AggregationOperationState::AggregationOperationState( const CatalogRelationSchema &input_relation, - const std::vector<const AggregateFunction*> &aggregate_functions, + const std::vector<const AggregateFunction *> &aggregate_functions, std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, std::vector<bool> &&is_distinct, std::vector<std::unique_ptr<const Scalar>> &&group_by, @@ -78,11 +78,14 @@ AggregationOperationState::AggregationOperationState( DCHECK(aggregate_functions.size() == arguments_.size()); // Get the types of GROUP BY expressions for creating HashTables below. - std::vector<const Type*> group_by_types; + std::vector<const Type *> group_by_types; for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { group_by_types.emplace_back(&group_by_element->getType()); } + std::vector<AggregationHandle *> group_by_handles; + group_by_handles.clear(); + if (aggregate_functions.size() == 0) { // If there is no aggregation function, then it is a distinctify operation // on the group-by expressions. @@ -91,26 +94,28 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back(new AggregationHandleDistinct()); arguments_.push_back({}); is_distinct_.emplace_back(false); - - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - handles_.back().get(), - storage_manager))); + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + {1}, + handles_, + storage_manager)); } else { // Set up each individual aggregate in this operation. - std::vector<const AggregateFunction*>::const_iterator agg_func_it - = aggregate_functions.begin(); - std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it - = arguments_.begin(); + std::vector<const AggregateFunction *>::const_iterator agg_func_it = + aggregate_functions.begin(); + std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator + args_it = arguments_.begin(); std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); - std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it - = distinctify_hash_table_impl_types.begin(); - for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) { + std::vector<HashTableImplType>::const_iterator + distinctify_hash_table_impl_types_it = + distinctify_hash_table_impl_types.begin(); + std::vector<std::size_t> payload_sizes; + for (; agg_func_it != aggregate_functions.end(); + ++agg_func_it, ++args_it, ++is_distinct_it) { // Get the Types of this aggregate's arguments so that we can create an // AggregationHandle. - std::vector<const Type*> argument_types; + std::vector<const Type *> argument_types; for (const std::unique_ptr<const Scalar> &argument : *args_it) { argument_types.emplace_back(&argument->getType()); } @@ -125,13 +130,13 @@ AggregationOperationState::AggregationOperationState( handles_.emplace_back((*agg_func_it)->createHandle(argument_types)); if (!group_by_list_.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group states. - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( - new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - handles_.back().get(), - storage_manager))); + // Aggregation with GROUP BY: combined payload is partially updated in + // the presence of DISTINCT. + if (*is_distinct_it) { + handles_.back()->blockUpdate(); + } + group_by_handles.emplace_back(handles_.back()); + payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); @@ -143,40 +148,60 @@ AggregationOperationState::AggregationOperationState( std::vector<attribute_id> local_arguments_as_attributes; local_arguments_as_attributes.reserve(args_it->size()); for (const std::unique_ptr<const Scalar> &argument : *args_it) { - const attribute_id argument_id = argument->getAttributeIdForValueAccessor(); + const attribute_id argument_id = + argument->getAttributeIdForValueAccessor(); if (argument_id == -1) { local_arguments_as_attributes.clear(); break; } else { - DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor()); + DCHECK_EQ(input_relation_.getID(), + argument->getRelationIdForValueAccessor()); local_arguments_as_attributes.push_back(argument_id); } } - arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes)); + arguments_as_attributes_.emplace_back( + std::move(local_arguments_as_attributes)); #endif } - // Initialize the corresponding distinctify hash table if this is a DISTINCT + // Initialize the corresponding distinctify hash table if this is a + // DISTINCT // aggregation. if (*is_distinct_it) { - std::vector<const Type*> key_types(group_by_types); - key_types.insert(key_types.end(), argument_types.begin(), argument_types.end()); - // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating + std::vector<const Type *> key_types(group_by_types); + key_types.insert( + key_types.end(), argument_types.begin(), argument_types.end()); + // TODO(jianqiao): estimated_num_entries is quite inaccurate for + // estimating // the number of entries in the distinctify hash table. We may estimate - // for each distinct aggregation an estimated_num_distinct_keys value during + // for each distinct aggregation an estimated_num_distinct_keys value + // during // query optimization, if it worths. distinctify_hashtables_.emplace_back( - handles_.back()->createDistinctifyHashTable( + AggregationStateFastHashTableFactory::CreateResizable( *distinctify_hash_table_impl_types_it, key_types, estimated_num_entries, + {0}, + {}, storage_manager)); ++distinctify_hash_table_impl_types_it; } else { distinctify_hashtables_.emplace_back(nullptr); } } + + if (!group_by_handles.empty()) { + // Aggregation with GROUP BY: create a HashTable pool for per-group + // states. + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager)); + } } } @@ -187,7 +212,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( DCHECK(ProtoIsValid(proto, database)); // Rebuild contructor arguments from their representation in 'proto'. - std::vector<const AggregateFunction*> aggregate_functions; + std::vector<const AggregateFunction *> aggregate_functions; std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments; std::vector<bool> is_distinct; std::vector<HashTableImplType> distinctify_hash_table_impl_types; @@ -200,62 +225,63 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto( arguments.emplace_back(); arguments.back().reserve(agg_proto.argument_size()); - for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); ++argument_idx) { + for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); + ++argument_idx) { arguments.back().emplace_back(ScalarFactory::ReconstructFromProto( - agg_proto.argument(argument_idx), - database)); + agg_proto.argument(argument_idx), database)); } is_distinct.emplace_back(agg_proto.is_distinct()); if (agg_proto.is_distinct()) { distinctify_hash_table_impl_types.emplace_back( - HashTableImplTypeFromProto( - proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))); + HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types( + distinctify_hash_table_impl_type_index))); ++distinctify_hash_table_impl_type_index; } } std::vector<std::unique_ptr<const Scalar>> group_by_expressions; - for (int group_by_idx = 0; - group_by_idx < proto.group_by_expressions_size(); + for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size(); ++group_by_idx) { group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto( - proto.group_by_expressions(group_by_idx), - database)); + proto.group_by_expressions(group_by_idx), database)); } unique_ptr<Predicate> predicate; if (proto.has_predicate()) { predicate.reset( - PredicateFactory::ReconstructFromProto(proto.predicate(), - database)); + PredicateFactory::ReconstructFromProto(proto.predicate(), database)); } - return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()), - aggregate_functions, - std::move(arguments), - std::move(is_distinct), - std::move(group_by_expressions), - predicate.release(), - proto.estimated_num_entries(), - HashTableImplTypeFromProto(proto.hash_table_impl_type()), - distinctify_hash_table_impl_types, - storage_manager); + return new AggregationOperationState( + database.getRelationSchemaById(proto.relation_id()), + aggregate_functions, + std::move(arguments), + std::move(is_distinct), + std::move(group_by_expressions), + predicate.release(), + proto.estimated_num_entries(), + HashTableImplTypeFromProto(proto.hash_table_impl_type()), + distinctify_hash_table_impl_types, + storage_manager); } -bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOperationState &proto, - const CatalogDatabaseLite &database) { +bool AggregationOperationState::ProtoIsValid( + const serialization::AggregationOperationState &proto, + const CatalogDatabaseLite &database) { if (!proto.IsInitialized() || !database.hasRelationWithId(proto.relation_id()) || (proto.aggregates_size() < 0)) { return false; } - std::size_t num_distinctify_hash_tables = proto.distinctify_hash_table_impl_types_size(); + std::size_t num_distinctify_hash_tables = + proto.distinctify_hash_table_impl_types_size(); std::size_t distinctify_hash_table_impl_type_index = 0; for (int i = 0; i < proto.aggregates_size(); ++i) { - if (!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) { + if (!AggregateFunctionFactory::ProtoIsValid( + proto.aggregates(i).function())) { return false; } @@ -266,16 +292,18 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe for (int argument_idx = 0; argument_idx < proto.aggregates(i).argument_size(); ++argument_idx) { - if (!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx), - database)) { + if (!ScalarFactory::ProtoIsValid( + proto.aggregates(i).argument(argument_idx), database)) { return false; } } if (proto.aggregates(i).is_distinct()) { - if (distinctify_hash_table_impl_type_index >= num_distinctify_hash_tables || + if (distinctify_hash_table_impl_type_index >= + num_distinctify_hash_tables || !serialization::HashTableImplType_IsValid( - proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))) { + proto.distinctify_hash_table_impl_types( + distinctify_hash_table_impl_type_index))) { return false; } } @@ -288,8 +316,9 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe } if (proto.group_by_expressions_size() > 0) { - if (!proto.has_hash_table_impl_type() - || !serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) { + if (!proto.has_hash_table_impl_type() || + !serialization::HashTableImplType_IsValid( + proto.hash_table_impl_type())) { return false; } } @@ -311,7 +340,8 @@ void AggregationOperationState::aggregateBlock(const block_id input_block) { } } -void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) { +void AggregationOperationState::finalizeAggregate( + InsertDestination *output_destination) { if (group_by_list_.empty()) { finalizeSingleState(output_destination); } else { @@ -330,19 +360,19 @@ void AggregationOperationState::mergeSingleState( } } -void AggregationOperationState::aggregateBlockSingleState(const block_id input_block) { +void AggregationOperationState::aggregateBlockSingleState( + const block_id input_block) { // Aggregate per-block state for each aggregate. std::vector<std::unique_ptr<AggregationState>> local_state; - BlockReference block(storage_manager_->getBlock(input_block, input_relation_)); + BlockReference block( + storage_manager_->getBlock(input_block, input_relation_)); // If there is a filter predicate, 'reuse_matches' holds the set of matching // tuples so that it can be reused across multiple aggregates (i.e. we only // pay the cost of evaluating the predicate once). std::unique_ptr<TupleIdSequence> reuse_matches; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { const std::vector<attribute_id> *local_arguments_as_attributes = nullptr; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all arguments are attributes of the input relation, elide a copy. @@ -365,12 +395,11 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b local_state.emplace_back(nullptr); } else { // Call StorageBlock::aggregate() to actually do the aggregation. - local_state.emplace_back( - block->aggregate(*handles_[agg_idx], - arguments_[agg_idx], - local_arguments_as_attributes, - predicate_.get(), - &reuse_matches)); + local_state.emplace_back(block->aggregate(*handles_[agg_idx], + arguments_[agg_idx], + local_arguments_as_attributes, + predicate_.get(), + &reuse_matches)); } } @@ -378,8 +407,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b mergeSingleState(local_state); } -void AggregationOperationState::aggregateBlockHashTable(const block_id input_block) { - BlockReference block(storage_manager_->getBlock(input_block, input_relation_)); +void AggregationOperationState::aggregateBlockHashTable( + const block_id input_block) { + BlockReference block( + storage_manager_->getBlock(input_block, input_relation_)); // If there is a filter predicate, 'reuse_matches' holds the set of matching // tuples so that it can be reused across multiple aggregates (i.e. we only @@ -391,11 +422,10 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // GROUP BY expressions once). std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression + // Call StorageBlock::aggregateDistinct() to insert the GROUP BY + // expression // values and the aggregation arguments together as keys directly into the // (threadsafe) shared global distinctify HashTable for this aggregate. block->aggregateDistinct(*handles_[agg_idx], @@ -406,45 +436,54 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo distinctify_hashtables_[agg_idx].get(), &reuse_matches, &reuse_group_by_vectors); - } else { - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); - AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); - DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(*handles_[agg_idx], - arguments_[agg_idx], - group_by_list_, - predicate_.get(), - agg_hash_table, - &reuse_matches, - &reuse_group_by_vectors); - group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table); } } + + // Call StorageBlock::aggregateGroupBy() to aggregate this block's values + // directly into the (threadsafe) shared global HashTable for this + // aggregate. + DCHECK(group_by_hashtable_pool_ != nullptr); + AggregationStateHashTableBase *agg_hash_table = + group_by_hashtable_pool_->getHashTableFast(); + DCHECK(agg_hash_table != nullptr); + block->aggregateGroupBy(arguments_, + group_by_list_, + predicate_.get(), + agg_hash_table, + &reuse_matches, + &reuse_group_by_vectors); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); } -void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) { +void AggregationOperationState::finalizeSingleState( + InsertDestination *output_destination) { // Simply build up a Tuple from the finalized values for each aggregate and // insert it in '*output_destination'. std::vector<TypedValue> attribute_values; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { single_states_[agg_idx].reset( - handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx])); + handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle( + *distinctify_hashtables_[agg_idx])); } - attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx])); + attribute_values.emplace_back( + handles_[agg_idx]->finalize(*single_states_[agg_idx])); } output_destination->insertTuple(Tuple(std::move(attribute_values))); } -void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) { +void AggregationOperationState::mergeGroupByHashTables( + AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) { + HashTableMergerFast merger(dst); + (static_cast<FastHashTable<true, false, true, false> *>(src)) + ->forEachCompositeKeyFast(&merger); +} + +void AggregationOperationState::finalizeHashTable( + InsertDestination *output_destination) { // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). std::vector<std::vector<TypedValue>> group_by_keys; @@ -455,60 +494,57 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest // TODO(harshad) - Find heuristics for faster merge, even in a single thread. // e.g. Keep merging entries from smaller hash tables to larger. - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); - if (hash_tables->size() > 1) { - for (int hash_table_index = 0; - hash_table_index < static_cast<int>(hash_tables->size() - 1); - ++hash_table_index) { - // Merge each hash table to the last hash table. - handles_[agg_idx]->mergeGroupByHashTables( - (*(*hash_tables)[hash_table_index]), - hash_tables->back().get()); - } + + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); + if (hash_tables->size() > 1) { + for (int hash_table_index = 0; + hash_table_index < static_cast<int>(hash_tables->size() - 1); + ++hash_table_index) { + // Merge each hash table to the last hash table. + mergeGroupByHashTables((*hash_tables)[hash_table_index].get(), + hash_tables->back().get()); } } // Collect per-aggregate finalized values. std::vector<std::unique_ptr<ColumnVector>> final_values; - for (std::size_t agg_idx = 0; - agg_idx < handles_.size(); - ++agg_idx) { + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + DCHECK(group_by_hashtable_pool_ != nullptr); + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); - group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + AggregationStateHashTableBase *new_hash_table = + group_by_hashtable_pool_->getHashTableFast(); + group_by_hashtable_pool_->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pool_->getAllHashTables(); } DCHECK(hash_tables->back() != nullptr); AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); + handles_[agg_idx]->allowUpdate(); handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( - *distinctify_hashtables_[agg_idx], - agg_hash_table); + *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx); } - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); - group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + AggregationStateHashTableBase *new_hash_table = + group_by_hashtable_pool_->getHashTable(); + group_by_hashtable_pool_->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pool_->getAllHashTables(); } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); - ColumnVector* agg_result_col = - handles_[agg_idx]->finalizeHashTable(*agg_hash_table, - &group_by_keys); + ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( + *agg_hash_table, &group_by_keys, agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } @@ -526,16 +562,20 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { const Type &group_by_type = group_by_element->getType(); if (NativeColumnVector::UsableForType(group_by_type)) { - NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + NativeColumnVector *element_cv = + new NativeColumnVector(group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } else { - IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + IndirectColumnVector *element_cv = + new IndirectColumnVector(group_by_type, group_by_keys.size()); group_by_cvs.emplace_back(element_cv); for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + element_cv->appendTypedValue( + std::move(group_key[group_by_element_idx])); } } ++group_by_element_idx;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index ecd116b..cbbfc22 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -102,16 +102,17 @@ class AggregationOperationState { * tables. Single aggregation state (when GROUP BY list is not * specified) is not allocated using memory from storage manager. */ - AggregationOperationState(const CatalogRelationSchema &input_relation, - const std::vector<const AggregateFunction*> &aggregate_functions, - std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, - std::vector<bool> &&is_distinct, - std::vector<std::unique_ptr<const Scalar>> &&group_by, - const Predicate *predicate, - const std::size_t estimated_num_entries, - const HashTableImplType hash_table_impl_type, - const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, - StorageManager *storage_manager); + AggregationOperationState( + const CatalogRelationSchema &input_relation, + const std::vector<const AggregateFunction *> &aggregate_functions, + std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, + std::vector<bool> &&is_distinct, + std::vector<std::unique_ptr<const Scalar>> &&group_by, + const Predicate *predicate, + const std::size_t estimated_num_entries, + const HashTableImplType hash_table_impl_type, + const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, + StorageManager *storage_manager); ~AggregationOperationState() {} @@ -143,8 +144,9 @@ class AggregationOperationState { * in. * @return Whether proto is fully-formed and valid. **/ - static bool ProtoIsValid(const serialization::AggregationOperationState &proto, - const CatalogDatabaseLite &database); + static bool ProtoIsValid( + const serialization::AggregationOperationState &proto, + const CatalogDatabaseLite &database); /** * @brief Compute aggregates on the tuples of the given storage block, @@ -165,10 +167,16 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); + static void mergeGroupByHashTables(AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst); + + int dflag; + private: // Merge locally (per storage block) aggregated states with global aggregation // states. - void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state); + void mergeSingleState( + const std::vector<std::unique_ptr<AggregationState>> &local_state); // Aggregate on input block. void aggregateBlockSingleState(const block_id input_block); @@ -185,7 +193,8 @@ class AggregationOperationState { // Each individual aggregate in this operation has an AggregationHandle and // some number of Scalar arguments. - std::vector<std::unique_ptr<AggregationHandle>> handles_; + // std::vector<std::unique_ptr<AggregationHandle>> handles_; + std::vector<AggregationHandle *> handles_; std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_; // For each aggregate, whether DISTINCT should be applied to the aggregate's @@ -193,7 +202,8 @@ class AggregationOperationState { std::vector<bool> is_distinct_; // Hash table for obtaining distinct (i.e. unique) arguments. - std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_; + std::vector<std::unique_ptr<AggregationStateHashTableBase>> + distinctify_hashtables_; #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION // If all an aggregate's argument expressions are simply attributes in @@ -208,10 +218,11 @@ class AggregationOperationState { // // TODO(shoban): We should ideally store the aggregation state together in one // hash table to prevent multiple lookups. - std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_; + std::vector<std::unique_ptr<AggregationStateHashTableBase>> + group_by_hashtables_; - // A vector of group by hash table pools, one for each group by clause. - std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_; + // A vector of group by hash table pools. + std::unique_ptr<HashTablePool> group_by_hashtable_pool_; StorageManager *storage_manager_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ac3512ce/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 65a7975..f05cc46 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -198,6 +198,9 @@ if (ENABLE_DISTRIBUTED) endif() add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp) +add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp) +add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp) +add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp) add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp) @@ -626,6 +629,53 @@ target_link_libraries(quickstep_storage_EvictionPolicy quickstep_threading_SpinMutex quickstep_threading_SpinSharedMutex quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastHashTable + quickstep_catalog_CatalogTypedefs + quickstep_storage_HashTableBase + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_TupleReference + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_threading_SpinMutex + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_utility_BloomFilter + quickstep_utility_HashPair + quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastHashTableFactory + glog + quickstep_storage_FastHashTable + quickstep_storage_FastSeparateChainingHashTable + quickstep_storage_HashTable + quickstep_storage_HashTable_proto + quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory + quickstep_storage_LinearOpenAddressingHashTable + quickstep_storage_SeparateChainingHashTable + quickstep_storage_SimpleScalarSeparateChainingHashTable + quickstep_storage_TupleReference + quickstep_types_TypeFactory + quickstep_utility_BloomFilter + quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastSeparateChainingHashTable + quickstep_storage_FastHashTable + quickstep_storage_HashTable + quickstep_storage_HashTableBase + quickstep_storage_HashTableKeyManager + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_utility_Alignment + quickstep_utility_Macros + quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_FileManager quickstep_storage_StorageBlockInfo quickstep_utility_Macros @@ -711,6 +761,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager target_link_libraries(quickstep_storage_HashTablePool glog quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable + quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase quickstep_threading_SpinMutex quickstep_utility_Macros @@ -1098,6 +1150,9 @@ target_link_libraries(quickstep_storage quickstep_storage_EvictionPolicy quickstep_storage_FileManager quickstep_storage_FileManagerLocal + quickstep_storage_FastHashTable + quickstep_storage_FastHashTableFactory + quickstep_storage_FastSeparateChainingHashTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase