Repository: incubator-quickstep Updated Branches: refs/heads/partitioned-aggregate-new f7183fcff -> 243e84e57 (forced update)
Support for performing partitioned aggregation. - Used for creating a pool of hash tables such that each hash table belongs to a unique partition. - The partitioning is done on the group-by keys. - Wrote a utility function to compute composite hash of a group of TypedValues. - Added a check for whether the aggregation is partitioned or not. - The conditions for whether the aggregation can be partitioned are as follows: 1. The query has a GROUP BY clause. 2. There are no aggrgeations with a DISTINCT clause. 3. The estimated number of groups are greater than a pre-defined threshold. - Method for partitioned aggregation with GROUP BY - StorageBlock now provides a method for performing GROUP BY aggregation in a partitioned way. - The Tuple class now supports a method to compute the hash of the entire tuple (i.e. hash key is the composite key made up of all the attributes in the tuple). - AggregationOperationState calls appropriate method (i.e. aggregateGroupBy or aggregateGroupByPartitioned) based on the way in which aggregation is being performed. - Set the aggregation hash table sizes to size of L2 cache. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/243e84e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/243e84e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/243e84e5 Branch: refs/heads/partitioned-aggregate-new Commit: 243e84e5758449a4134b2b357f0d093f4ecf52ca Parents: ac3512c Author: Harshad Deshmukh <hbdeshm...@apache.org> Authored: Wed Sep 21 11:43:39 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Thu Sep 22 15:16:33 2016 -0500 ---------------------------------------------------------------------- .../FinalizeAggregationOperator.cpp | 35 +++- .../FinalizeAggregationOperator.hpp | 9 +- storage/AggregationOperationState.cpp | 135 +++++++++--- storage/AggregationOperationState.hpp | 47 +++++ storage/CMakeLists.txt | 12 ++ storage/HashTablePool.hpp | 8 +- storage/PartitionedHashTablePool.hpp | 206 +++++++++++++++++++ storage/StorageBlock.cpp | 115 +++++++++++ storage/StorageBlock.hpp | 43 ++++ types/containers/CMakeLists.txt | 1 + types/containers/Tuple.hpp | 8 + utility/CMakeLists.txt | 6 + utility/CompositeHash.hpp | 52 +++++ 13 files changed, 637 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 7e337de..55d1357 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -41,12 +41,29 @@ bool FinalizeAggregationOperator::getAllWorkOrders( if (blocking_dependencies_met_ && !started_) { started_ = true; - container->addNormalWorkOrder( - new FinalizeAggregationWorkOrder( - query_id_, - query_context->getAggregationState(aggr_state_index_), - query_context->getInsertDestination(output_destination_index_)), - op_index_); + DCHECK(query_context->getAggregationState(aggr_state_index_) != nullptr); + if (query_context->getAggregationState(aggr_state_index_)->isAggregatePartitioned()) { + // The same AggregationState is shared across all the WorkOrders. + for (std::size_t part_id = 0; + part_id < query_context->getAggregationState(aggr_state_index_) + ->getNumPartitions(); + ++part_id) { + container->addNormalWorkOrder( + new FinalizeAggregationWorkOrder( + query_id_, + query_context->getAggregationState(aggr_state_index_), + query_context->getInsertDestination(output_destination_index_), + static_cast<int>(part_id)), + op_index_); + } + } else { + container->addNormalWorkOrder( + new FinalizeAggregationWorkOrder( + query_id_, + query_context->getAggregationState(aggr_state_index_), + query_context->getInsertDestination(output_destination_index_)), + op_index_); + } } return started_; } @@ -70,7 +87,11 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer void FinalizeAggregationWorkOrder::execute() { - state_->finalizeAggregate(output_destination_); + if (state_->isAggregatePartitioned()) { + state_->finalizeAggregatePartitioned(part_id_, output_destination_); + } else { + state_->finalizeAggregate(output_destination_); + } } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index 0aeac2a..ae7127a 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -119,13 +119,17 @@ class FinalizeAggregationWorkOrder : public WorkOrder { * @param state The AggregationState to use. * @param output_destination The InsertDestination to insert aggregation * results. + * @param part_id The partition ID for which the Finalize aggregation work + * order is issued. Ignore if aggregation is not partitioned. */ FinalizeAggregationWorkOrder(const std::size_t query_id, AggregationOperationState *state, - InsertDestination *output_destination) + InsertDestination *output_destination, + const int part_id = -1) : WorkOrder(query_id), state_(DCHECK_NOTNULL(state)), - output_destination_(DCHECK_NOTNULL(output_destination)) {} + output_destination_(DCHECK_NOTNULL(output_destination)), + part_id_(part_id) {} ~FinalizeAggregationWorkOrder() override {} @@ -134,6 +138,7 @@ class FinalizeAggregationWorkOrder : public WorkOrder { private: AggregationOperationState *state_; InsertDestination *output_destination_; + const int part_id_; DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 073b813..1db42ef 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -69,6 +69,8 @@ AggregationOperationState::AggregationOperationState( const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, StorageManager *storage_manager) : input_relation_(input_relation), + is_aggregate_partitioned_(checkAggregatePartitioned( + estimated_num_entries, is_distinct, group_by)), predicate_(predicate), group_by_list_(std::move(group_by)), arguments_(std::move(arguments)), @@ -166,18 +168,16 @@ AggregationOperationState::AggregationOperationState( } // Initialize the corresponding distinctify hash table if this is a - // DISTINCT - // aggregation. + // DISTINCT aggregation. if (*is_distinct_it) { std::vector<const Type *> key_types(group_by_types); key_types.insert( key_types.end(), argument_types.begin(), argument_types.end()); // TODO(jianqiao): estimated_num_entries is quite inaccurate for - // estimating - // the number of entries in the distinctify hash table. We may estimate - // for each distinct aggregation an estimated_num_distinct_keys value - // during - // query optimization, if it worths. + // estimating the number of entries in the distinctify hash table. + // We may estimate for each distinct aggregation an + // estimated_num_distinct_keys value during query optimization, if it's + // worth. distinctify_hashtables_.emplace_back( AggregationStateFastHashTableFactory::CreateResizable( *distinctify_hash_table_impl_types_it, @@ -193,14 +193,24 @@ AggregationOperationState::AggregationOperationState( } if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group - // states. - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types, - payload_sizes, - group_by_handles, - storage_manager)); + // Aggregation with GROUP BY: create a HashTable pool. + if (!is_aggregate_partitioned_) { + group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager)); + } else { + partitioned_group_by_hashtable_pool_.reset( + new PartitionedHashTablePool(estimated_num_entries, + kNumPartitionsForAggregate, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager)); + } } } } @@ -439,20 +449,30 @@ void AggregationOperationState::aggregateBlockHashTable( } } - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pool_ != nullptr); - AggregationStateHashTableBase *agg_hash_table = + if (!is_aggregate_partitioned_) { + // Call StorageBlock::aggregateGroupBy() to aggregate this block's values + // directly into the (threadsafe) shared global HashTable for this + // aggregate. + DCHECK(group_by_hashtable_pool_ != nullptr); + AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pool_->getHashTableFast(); - DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(arguments_, - group_by_list_, - predicate_.get(), - agg_hash_table, - &reuse_matches, - &reuse_group_by_vectors); - group_by_hashtable_pool_->returnHashTable(agg_hash_table); + DCHECK(agg_hash_table != nullptr); + block->aggregateGroupBy(arguments_, + group_by_list_, + predicate_.get(), + agg_hash_table, + &reuse_matches, + &reuse_group_by_vectors); + group_by_hashtable_pool_->returnHashTable(agg_hash_table); + } else { + block->aggregateGroupByPartitioned( + arguments_, + group_by_list_, + predicate_.get(), + &reuse_matches, + &reuse_group_by_vectors, + partitioned_group_by_hashtable_pool_.get()); + } } void AggregationOperationState::finalizeSingleState( @@ -595,4 +615,63 @@ void AggregationOperationState::finalizeHashTable( output_destination->bulkInsertTuples(&complete_result); } +void AggregationOperationState::finalizeAggregatePartitioned( + const std::size_t partition_id, InsertDestination *output_destination) { + // Each element of 'group_by_keys' is a vector of values for a particular + // group (which is also the prefix of the finalized Tuple for that group). + std::vector<std::vector<TypedValue>> group_by_keys; + + // Collect per-aggregate finalized values. + std::vector<std::unique_ptr<ColumnVector>> final_values; + for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { + AggregationStateHashTableBase *hash_table = + partitioned_group_by_hashtable_pool_->getHashTable(partition_id); + ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( + *hash_table, &group_by_keys, agg_idx); + if (agg_result_col != nullptr) { + final_values.emplace_back(agg_result_col); + } + } + + // Reorganize 'group_by_keys' in column-major order so that we can make a + // ColumnVectorsValueAccessor to bulk-insert results. + // + // TODO(chasseur): Shuffling around the GROUP BY keys like this is suboptimal + // if there is only one aggregate. The need to do this should hopefully go + // away when we work out storing composite structures for multiple aggregates + // in a single HashTable. + std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; + std::size_t group_by_element_idx = 0; + for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) { + const Type &group_by_type = group_by_element->getType(); + if (NativeColumnVector::UsableForType(group_by_type)) { + NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector<TypedValue> &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } else { + IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size()); + group_by_cvs.emplace_back(element_cv); + for (std::vector<TypedValue> &group_key : group_by_keys) { + element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); + } + } + ++group_by_element_idx; + } + + // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys + // and the finalized aggregates. + ColumnVectorsValueAccessor complete_result; + for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) { + complete_result.addColumn(group_by_cv.release()); + } + for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) { + complete_result.addColumn(final_value_cv.release()); + } + + // Bulk-insert the complete result. + output_destination->bulkInsertTuples(&complete_result); +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index cbbfc22..a5779d2 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -32,6 +32,7 @@ #include "storage/AggregationOperationState.pb.h" #include "storage/HashTableBase.hpp" #include "storage/HashTablePool.hpp" +#include "storage/PartitionedHashTablePool.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" @@ -167,12 +168,31 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); + void finalizeAggregatePartitioned( + const std::size_t partition_id, InsertDestination *output_destination); + static void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst); + bool isAggregatePartitioned() const { + return is_aggregate_partitioned_; + } + + /** + * @note This method is relevant only when the aggregate is partitioned. + **/ + std::size_t getNumPartitions() const { + return is_aggregate_partitioned_ + ? partitioned_group_by_hashtable_pool_->getNumPartitions() + : 1; + } + int dflag; private: + static constexpr std::size_t kPartitionedAggregateThreshold = 100; + static constexpr std::size_t kNumPartitionsForAggregate = 40; + // Merge locally (per storage block) aggregated states with global aggregation // states. void mergeSingleState( @@ -185,9 +205,34 @@ class AggregationOperationState { void finalizeSingleState(InsertDestination *output_destination); void finalizeHashTable(InsertDestination *output_destination); + bool checkAggregatePartitioned( + const std::size_t estimated_num_groups, + const std::vector<bool> &is_distinct, + const std::vector<std::unique_ptr<const Scalar>> &group_by) const { + // Check if there's a distinct operation involved in any aggregate, if so + // the aggregate can't be partitioned. + for (auto distinct : is_distinct) { + if (distinct) { + return false; + } + } + // There's no distinct aggregation involved, Check if there's at least one + // GROUP BY operation. + if (group_by.empty()) { + return false; + } + // There are GROUP BYs without DISTINCT. Check if the estimated number of + // groups is large enough to warrant a partitioned aggregation. + return estimated_num_groups > kPartitionedAggregateThreshold; + } + // Common state for all aggregates in this operation: the input relation, the // filter predicate (if any), and the list of GROUP BY expressions (if any). const CatalogRelationSchema &input_relation_; + + // Whether the aggregation is partitioned or not. + const bool is_aggregate_partitioned_; + std::unique_ptr<const Predicate> predicate_; std::vector<std::unique_ptr<const Scalar>> group_by_list_; @@ -224,6 +269,8 @@ class AggregationOperationState { // A vector of group by hash table pools. std::unique_ptr<HashTablePool> group_by_hashtable_pool_; + std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_; + StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index f05cc46..9c1ebd0 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -235,6 +235,7 @@ add_library(quickstep_storage_PackedRowStoreTupleStorageSubBlock add_library(quickstep_storage_PackedRowStoreValueAccessor ../empty_src.cpp PackedRowStoreValueAccessor.hpp) +add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp) add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp) add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp) add_library(quickstep_storage_SeparateChainingHashTable ../empty_src.cpp SeparateChainingHashTable.hpp) @@ -289,6 +290,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_HashTableFactory quickstep_storage_HashTablePool quickstep_storage_InsertDestination + quickstep_storage_PartitionedHashTablePool quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager @@ -850,6 +852,14 @@ target_link_libraries(quickstep_storage_PackedRowStoreValueAccessor quickstep_types_TypedValue quickstep_utility_BitVector quickstep_utility_Macros) +target_link_libraries(quickstep_storage_PartitionedHashTablePool + glog + quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable + quickstep_storage_FastHashTableFactory + quickstep_storage_HashTableBase + quickstep_utility_Macros + quickstep_utility_StringUtil) target_link_libraries(quickstep_storage_PreloaderThread glog quickstep_catalog_CatalogDatabase @@ -971,6 +981,7 @@ target_link_libraries(quickstep_storage_StorageBlock quickstep_storage_IndexSubBlock quickstep_storage_InsertDestinationInterface quickstep_storage_PackedRowStoreTupleStorageSubBlock + quickstep_storage_PartitionedHashTablePool quickstep_storage_SMAIndexSubBlock quickstep_storage_SplitRowStoreTupleStorageSubBlock quickstep_storage_StorageBlockBase @@ -1167,6 +1178,7 @@ target_link_libraries(quickstep_storage quickstep_storage_LinearOpenAddressingHashTable quickstep_storage_PackedRowStoreTupleStorageSubBlock quickstep_storage_PackedRowStoreValueAccessor + quickstep_storage_PartitionedHashTablePool quickstep_storage_PreloaderThread quickstep_storage_SMAIndexSubBlock quickstep_storage_SeparateChainingHashTable http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/HashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp index 3cdfcb3..84e1b42 100644 --- a/storage/HashTablePool.hpp +++ b/storage/HashTablePool.hpp @@ -198,15 +198,17 @@ class HashTablePool { inline std::size_t reduceEstimatedCardinality( const std::size_t original_estimate) const { - if (original_estimate < kEstimateReductionFactor) { + /*if (original_estimate < kEstimateReductionFactor) { return original_estimate; } else { DCHECK_GT(kEstimateReductionFactor, 0u); return original_estimate / kEstimateReductionFactor; - } + }*/ + return kEstimateReductionFactor; } - static constexpr std::size_t kEstimateReductionFactor = 100; + // L2 cache size. + static constexpr std::size_t kEstimateReductionFactor = 262144; std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/PartitionedHashTablePool.hpp ---------------------------------------------------------------------- diff --git a/storage/PartitionedHashTablePool.hpp b/storage/PartitionedHashTablePool.hpp new file mode 100644 index 0000000..184bddd --- /dev/null +++ b/storage/PartitionedHashTablePool.hpp @@ -0,0 +1,206 @@ +/** + * Copyright 2016, Quickstep Research Group, Computer Sciences Department, + * University of WisconsinâMadison. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_ +#define QUICKSTEP_STORAGE_PARTITIONED_HASH_TABLE_POOL_HPP_ + +#include <chrono> +#include <memory> +#include <utility> +#include <vector> + +#include "expressions/aggregation/AggregationHandle.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" +#include "storage/FastHashTableFactory.hpp" +#include "utility/Macros.hpp" +#include "utility/StringUtil.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class StorageManager; +class Type; + +/** \addtogroup Storage + * @{ + */ + +/** + * @brief A pool of HashTables used for a single aggregation handle. Each + * HashTable represents values from a given partition, which is + * determined by the keys in the group by clause. + **/ +class PartitionedHashTablePool { + public: + /** + * @brief Constructor. + * + * @param estimated_num_entries The maximum number of entries in a hash table. + * @param num_partitions The number of partitions (i.e. number of HashTables) + * @param hash_table_impl_type The type of hash table implementation. + * @param group_by_types A vector of pointer of types which form the group by + * key. + * @param agg_handle The aggregation handle. + * @param storage_manager A pointer to the storage manager. + * + * @note The estimate of number of entries is quite inaccurate at this time. + * If we go by the current estimate, each hash table demands much + * larger space than it actually needs, which causes the system to + * either trigger evictions or worse - run out of memory. To fix this + * issue, we divide the estimate by 100. The division will not affect + * correctness, however it may allocate some hash tables smaller space + * than their requirement, causing them to be resized during build + * phase, which has a performance penalty. + **/ + PartitionedHashTablePool(const std::size_t estimated_num_entries, + const std::size_t num_partitions, + const HashTableImplType hash_table_impl_type, + const std::vector<const Type *> &group_by_types, + AggregationHandle *agg_handle, + StorageManager *storage_manager) + : estimated_num_entries_( + reduceEstimatedCardinality(estimated_num_entries, num_partitions)), + num_partitions_(num_partitions), + hash_table_impl_type_(hash_table_impl_type), + group_by_types_(group_by_types), + agg_handle_(DCHECK_NOTNULL(agg_handle)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) { + initializeAllHashTables(); + } + + PartitionedHashTablePool(const std::size_t estimated_num_entries, + const std::size_t num_partitions, + const HashTableImplType hash_table_impl_type, + const std::vector<const Type *> &group_by_types, + const std::vector<std::size_t> &payload_sizes, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager) + : estimated_num_entries_( + reduceEstimatedCardinality(estimated_num_entries, num_partitions)), + num_partitions_(num_partitions), + hash_table_impl_type_(hash_table_impl_type), + group_by_types_(group_by_types), + payload_sizes_(payload_sizes), + handles_(handles), + storage_manager_(DCHECK_NOTNULL(storage_manager)) { + initializeAllHashTables(); + } + + /** + * @brief Check out a hash table for insertion. + * + * @param partition_id The ID of the partitioned HashTable. + * + * @return A hash table pointer for the given HashTable. + **/ + AggregationStateHashTableBase* getHashTable(const std::size_t partition_id) { + DCHECK_LT(partition_id, num_partitions_); + DCHECK_LT(partition_id, hash_tables_.size()); + return hash_tables_[partition_id].get(); + } + + /** + * @brief Check out a hash table for insertion. + * + * @param partition_id The ID of the partitioned HashTable. + * + * @return A hash table pointer for the given HashTable. + **/ + AggregationStateHashTableBase* getHashTableFast(const std::size_t partition_id) { + DCHECK_LT(partition_id, num_partitions_); + DCHECK_LT(partition_id, hash_tables_.size()); + return hash_tables_[partition_id].get(); + } + + /** + * @brief Get all the hash tables from the pool. + * + * @warning The caller should ensure that this call is made when no hash table + * is being checked in or checked out from the pool. In other words + * the hash table pool is in read-only state. + * + * @param All the hash tables in the pool. + * + **/ + std::vector<std::unique_ptr<AggregationStateHashTableBase>>* + getAllHashTables() { + return &hash_tables_; + } + + inline std::size_t getNumPartitions() const { + return num_partitions_; + } + + private: + void initializeAllHashTables() { + for (std::size_t part_num = 0; part_num < num_partitions_; ++part_num) { + AggregationStateHashTableBase *part_hash_table = createNewHashTableFast(); + hash_tables_.push_back( + std::unique_ptr<AggregationStateHashTableBase>(part_hash_table)); + } + } + + AggregationStateHashTableBase* createNewHashTable() { + return agg_handle_->createGroupByHashTable(hash_table_impl_type_, + group_by_types_, + estimated_num_entries_, + storage_manager_); + } + + AggregationStateHashTableBase* createNewHashTableFast() { + return AggregationStateFastHashTableFactory::CreateResizable( + hash_table_impl_type_, + group_by_types_, + estimated_num_entries_, + payload_sizes_, + handles_, + storage_manager_); + } + + inline std::size_t reduceEstimatedCardinality( + const std::size_t original_estimate, + const std::size_t num_partitions) const { + CHECK_NE(num_partitions, 0u); + // return original_estimate / num_partitions; + return 262144; // 256 KB (L2 Cache size). + } + + std::vector<std::unique_ptr<AggregationStateHashTableBase>> hash_tables_; + + const std::size_t estimated_num_entries_; + const std::size_t num_partitions_; + + const HashTableImplType hash_table_impl_type_; + + const std::vector<const Type *> group_by_types_; + + std::vector<std::size_t> payload_sizes_; + + AggregationHandle *agg_handle_; + const std::vector<AggregationHandle *> handles_; + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(PartitionedHashTablePool); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_HASH_TABLE_POOL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/StorageBlock.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp index ec5990f..94c46a8 100644 --- a/storage/StorageBlock.cpp +++ b/storage/StorageBlock.cpp @@ -41,6 +41,7 @@ #include "storage/IndexSubBlock.hpp" #include "storage/InsertDestinationInterface.hpp" #include "storage/PackedRowStoreTupleStorageSubBlock.hpp" +#include "storage/PartitionedHashTablePool.hpp" #include "storage/SMAIndexSubBlock.hpp" #include "storage/SplitRowStoreTupleStorageSubBlock.hpp" #include "storage/StorageBlockBase.hpp" @@ -1369,4 +1370,118 @@ const std::size_t StorageBlock::getNumTuples() const { return tuple_store_->numTuples(); } +void StorageBlock::aggregateGroupByPartitioned( + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const Predicate *predicate, + std::unique_ptr<TupleIdSequence> *reuse_matches, + std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors, + PartitionedHashTablePool *hashtable_pool) const { + DCHECK(!group_by.empty()) + << "Called aggregateGroupByPartitioned() with zero GROUP BY expressions"; + + SubBlocksReference sub_blocks_ref(*tuple_store_, + indices_, + indices_consistent_); + + // IDs of 'arguments' as attributes in the ValueAccessor we create below. + std::vector<attribute_id> arg_ids; + std::vector<attribute_id> argument_ids; + + // IDs of GROUP BY key element(s) in the ValueAccessor we create below. + std::vector<attribute_id> key_ids; + + // An intermediate ValueAccessor that stores the materialized 'arguments' for + // this aggregate, as well as the GROUP BY expression values. + ColumnVectorsValueAccessor temp_result; + std::unique_ptr<ValueAccessor> accessor; + if (predicate) { + if (!*reuse_matches) { + // If there is a filter predicate that hasn't already been evaluated, + // evaluate it now and save the results for other aggregates on this + // same block. + reuse_matches->reset(getMatchesForPredicate(predicate)); + } + + // Create a filtered ValueAccessor that only iterates over predicate + // matches. + accessor.reset(tuple_store_->createValueAccessor(reuse_matches->get())); + } else { + // Create a ValueAccessor that iterates over all tuples in this block + accessor.reset(tuple_store_->createValueAccessor()); + } + + attribute_id attr_id = 0; + + // First, put GROUP BY keys into 'temp_result'. + if (reuse_group_by_vectors->empty()) { + // Compute GROUP BY values from group_by Scalars, and store them in + // reuse_group_by_vectors for reuse by other aggregates on this same + // block. + reuse_group_by_vectors->reserve(group_by.size()); + for (const std::unique_ptr<const Scalar> &group_by_element : group_by) { + reuse_group_by_vectors->emplace_back( + group_by_element->getAllValues(accessor.get(), &sub_blocks_ref)); + temp_result.addColumn(reuse_group_by_vectors->back().get(), false); + key_ids.push_back(attr_id++); + } + } else { + // Reuse precomputed GROUP BY values from reuse_group_by_vectors. + DCHECK_EQ(group_by.size(), reuse_group_by_vectors->size()) + << "Wrong number of reuse_group_by_vectors"; + for (const std::unique_ptr<ColumnVector> &reuse_cv : *reuse_group_by_vectors) { + temp_result.addColumn(reuse_cv.get(), false); + key_ids.push_back(attr_id++); + } + } + + // Compute argument vectors and add them to 'temp_result'. + for (const std::vector<std::unique_ptr<const Scalar>> &argument : arguments) { + arg_ids.clear(); + for (const std::unique_ptr<const Scalar> &args : argument) { + temp_result.addColumn(args->getAllValues(accessor.get(), &sub_blocks_ref)); + argument_ids.push_back(attr_id++); + } + if (argument.empty()) { + argument_ids.push_back(kInvalidAttributeID); + } + } + + // Compute the partitions for the tuple formed by group by values. + std::vector<std::unique_ptr<TupleIdSequence>> partition_membership; + partition_membership.resize(hashtable_pool->getNumPartitions()); + + // Create a tuple-id sequence for each partition. + for (std::size_t partition = 0; + partition < hashtable_pool->getNumPartitions(); + ++partition) { + partition_membership[partition].reset(new TupleIdSequence(temp_result.getEndPosition())); + } + + // Iterate over ValueAccessor for each tuple, + // set a bit in the appropriate TupleIdSequence. + temp_result.beginIteration(); + while (temp_result.next()) { + const std::size_t curr_tuple_partition_id = + temp_result.getTupleWithAttributes(key_ids)->getTupleHash() % + hashtable_pool->getNumPartitions(); + partition_membership[curr_tuple_partition_id]->set( + temp_result.getCurrentPosition(), true); + } + // For each partition, create an adapter around Value Accessor and + // TupleIdSequence. + std::vector<std::unique_ptr< + TupleIdSequenceAdapterValueAccessor<ColumnVectorsValueAccessor>>> adapter; + adapter.resize(hashtable_pool->getNumPartitions()); + for (std::size_t partition = 0; + partition < hashtable_pool->getNumPartitions(); + ++partition) { + adapter[partition].reset(temp_result.createSharedTupleIdSequenceAdapter( + *partition_membership[partition])); + hashtable_pool->getHashTable(partition) + ->upsertValueAccessorCompositeKeyFast( + argument_ids, adapter[partition].get(), key_ids, true); + } +} + } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/storage/StorageBlock.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp index bab5bab..3fcaddf 100644 --- a/storage/StorageBlock.hpp +++ b/storage/StorageBlock.hpp @@ -44,6 +44,7 @@ class AggregationState; class CatalogRelationSchema; class ColumnVector; class InsertDestinationInterface; +class PartitionedHashTablePool; class Predicate; class Scalar; class StorageBlockLayout; @@ -466,6 +467,48 @@ class StorageBlock : public StorageBlockBase { std::unique_ptr<TupleIdSequence> *reuse_matches, std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors) const; + + /** + * @brief Perform the GROUP BY aggregation for the case when aggregation is + * partitioned. + * + * @note The difference between this method and the aggregateGroupBy method + * is that in this method, the tuples are routed to different HashTables + * based on the partition to which they belong to. The partition is + * determined by the GROUP BY attributes. Right now hash based + * partitioning is performed. + * + * @param arguments The arguments to the aggregation function as Scalars. + * @param group_by The list of GROUP BY attributes/expressions. The tuples in + * this storage block are grouped by these attributes before + * aggregation. + * @param predicate A predicate for selection. nullptr indicates that all + * tuples should be aggregated on. + * @param reuse_matches This parameter is used to store and reuse tuple-id + * sequence of matches pre-computed in an earlier invocations of + * aggregateGroupBy(). \c reuse_matches is never \c nullptr for ease of + * use. Current invocation of aggregateGroupBy() will reuse + * TupleIdSequence if passed, otherwise computes a TupleIdSequence based + * on \c predicate and stores in \c reuse_matches. We use + * std::unique_ptr for each of use, since the caller will not have to + * selective free. + * @param reuse_group_by_vectors This parameter is used to store and reuse + * GROUP BY attribute vectors pre-computed in an earlier invocation of + * aggregateGroupBy(). \c reuse_group_by_vectors is never \c nullptr + * for ease of use. Current invocation of aggregateGroupBy() will reuse + * ColumnVectors if non-empty, otherwise computes ColumnVectors based + * on \c group_by and stores them in \c reuse_group_by_vectors. + * @param hashtable_pool The pool of aggregation HashTables. Each hash table + * in this pool belongs to a unique partition. + **/ + void aggregateGroupByPartitioned( + const std::vector<std::vector<std::unique_ptr<const Scalar>>> &arguments, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const Predicate *predicate, + std::unique_ptr<TupleIdSequence> *reuse_matches, + std::vector<std::unique_ptr<ColumnVector>> *reuse_group_by_vectors, + PartitionedHashTablePool *hashtable_pool) const; + /** * @brief Inserts the GROUP BY expressions and aggregation arguments together * as keys into the distinctify hash table. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/types/containers/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt index aacb63a..c2a6623 100644 --- a/types/containers/CMakeLists.txt +++ b/types/containers/CMakeLists.txt @@ -49,6 +49,7 @@ target_link_libraries(quickstep_types_containers_Tuple quickstep_catalog_CatalogTypedefs quickstep_types_TypedValue quickstep_types_containers_Tuple_proto + quickstep_utility_CompositeHash quickstep_utility_Macros) target_link_libraries(quickstep_types_containers_Tuple_proto quickstep_types_TypedValue_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/types/containers/Tuple.hpp ---------------------------------------------------------------------- diff --git a/types/containers/Tuple.hpp b/types/containers/Tuple.hpp index 60f832c..6237d54 100644 --- a/types/containers/Tuple.hpp +++ b/types/containers/Tuple.hpp @@ -28,6 +28,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "types/TypedValue.hpp" #include "types/containers/Tuple.pb.h" +#include "utility/CompositeHash.hpp" #include "utility/Macros.hpp" #include "glog/logging.h" @@ -218,6 +219,13 @@ class Tuple { return attribute_values_.size(); } + /** + * @brief Get the hash value of the tuple. + **/ + std::size_t getTupleHash() const { + return HashCompositeKey(attribute_values_); + } + private: /** * @brief Constructor which does not create any attributes, nor pre-reserve http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/utility/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt index ddaae45..4fb6e5b 100644 --- a/utility/CMakeLists.txt +++ b/utility/CMakeLists.txt @@ -167,6 +167,7 @@ add_library(quickstep_utility_BloomFilter_proto add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp) add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp) add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp) +add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp) add_library(quickstep_utility_DAG ../empty_src.cpp DAG.hpp) add_library(quickstep_utility_EqualsAnyConstant ../empty_src.cpp EqualsAnyConstant.hpp) add_library(quickstep_utility_ExecutionDAGVisualizer @@ -227,6 +228,10 @@ target_link_libraries(quickstep_utility_CalculateInstalledMemory glog) target_link_libraries(quickstep_utility_CheckSnprintf glog) +target_link_libraries(quickstep_utility_CompositeHash + quickstep_types_TypedValue + quickstep_utility_HashPair + glog) target_link_libraries(quickstep_utility_DAG glog quickstep_utility_Macros) @@ -318,6 +323,7 @@ target_link_libraries(quickstep_utility quickstep_utility_CalculateInstalledMemory quickstep_utility_Cast quickstep_utility_CheckSnprintf + quickstep_utility_CompositeHash quickstep_utility_DAG quickstep_utility_EqualsAnyConstant quickstep_utility_ExecutionDAGVisualizer http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/243e84e5/utility/CompositeHash.hpp ---------------------------------------------------------------------- diff --git a/utility/CompositeHash.hpp b/utility/CompositeHash.hpp new file mode 100644 index 0000000..517bc96 --- /dev/null +++ b/utility/CompositeHash.hpp @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_ +#define QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_ + +#include <cstddef> +#include <vector> + +#include "types/TypedValue.hpp" +#include "utility/HashPair.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** + * @brief Compute the hash value of a composite key. + * + * @param key A vector of TypedValues which together form the composite key. + * @return The hash value. + **/ +static std::size_t HashCompositeKey(const std::vector<TypedValue> &key) { + DCHECK(!key.empty()); + std::size_t hash = key.front().getHash(); + for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1; + key_it != key.end(); + ++key_it) { + hash = CombineHashes(hash, key_it->getHash()); + } + return hash; +} + +} // namespace quickstep + +#endif // QUICKSTEP_UTILITY_COMPOSITE_HASH_HPP_