Repository: incubator-quickstep Updated Branches: refs/heads/collision-free-agg 60519429e -> dee650f64
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/dee650f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/dee650f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/dee650f6 Branch: refs/heads/collision-free-agg Commit: dee650f64d7209aff329b4e79dce8737ff229727 Parents: 6051942 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Sat Feb 4 15:33:25 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Sat Feb 4 15:33:25 2017 -0600 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 2 +- query_optimizer/ExecutionGenerator.cpp | 24 +- query_optimizer/ExecutionGenerator.hpp | 14 +- .../FinalizeAggregationOperator.cpp | 2 +- .../InitializeAggregationOperator.cpp | 2 +- storage/AggregationOperationState.cpp | 98 ++++---- storage/AggregationOperationState.hpp | 34 ++- storage/CollisionFreeVectorTable.cpp | 28 ++- storage/CollisionFreeVectorTable.hpp | 231 ++++++++++++++----- storage/HashTableBase.hpp | 10 +- storage/HashTableFactory.hpp | 27 ++- storage/PackedPayloadHashTable.cpp | 2 +- storage/PackedPayloadHashTable.hpp | 213 ++++++++++++++++- storage/ValueAccessorMultiplexer.hpp | 42 ++++ 14 files changed, 570 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index ca2e35d..73b826f 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -58,7 +58,7 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( } static_cast<PackedPayloadHashTable *>(distinctify_hash_table) - ->upsertValueAccessor({}, concatenated_ids, accessor_mux); + ->upsertValueAccessorCompositeKey({}, concatenated_ids, accessor_mux); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index c047b67..a512e6f 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -377,7 +377,7 @@ void ExecutionGenerator::dropAllTemporaryRelations() { bool ExecutionGenerator::canUseCollisionFreeAggregation( const P::AggregatePtr &aggregate, const std::size_t estimated_num_groups, - std::size_t *exact_num_groups) const { + std::size_t *max_num_groups) const { // Supports only single group-by key. if (aggregate->grouping_expressions().size() != 1) { return false; @@ -421,9 +421,13 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation( return false; } - // TODO + // TODO(jianqiao): + // 1. Handle the case where min_cpp_value is below 0 or far greater than 0. + // 2. Reason about the upbound (e.g. by checking memory size) instead of + // hardcoding it here. + const std::int64_t kGroupSizeUpbound = 1000000000; if (min_cpp_value < 0 || - max_cpp_value > 1000000000 || + max_cpp_value > kGroupSizeUpbound || max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) { return false; } @@ -436,6 +440,7 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation( return false; } + // TODO(jianqiao): Support AggregationID::AVG. switch (agg_func->getAggregate().getAggregationID()) { case AggregationID::kCount: // Fall through case AggregationID::kSum: @@ -450,7 +455,16 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation( } if (arguments.size() == 1) { - switch (arguments.front()->getValueType().getTypeID()) { + const Type &arg_type = arguments.front()->getValueType(); + + // TODO(jianqiao): we need a bit more work in CollisionFreeVectorTable to + // support nullable argument types. That is, we need a bit vector there + // for each aggregation handle to indicate whether the state is NULL. + if (arg_type.isNullable()) { + return false; + } + + switch (arg_type.getTypeID()) { case TypeID::kInt: // Fall through case TypeID::kLong: case TypeID::kFloat: @@ -462,7 +476,7 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation( } } - *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; + *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; return true; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index b52fe97..987f11a 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -205,9 +205,21 @@ class ExecutionGenerator { */ std::string getNewRelationName(); + /** + * @brief Checks whether an aggregate node can be efficiently evaluated with + * the collision-free aggregation fast path. + * + * @param aggregate The physical aggregate node to be checked. + * @param estimated_num_groups The estimated number of groups for the aggregate. + * @param exact_num_groups If collision-free aggregation is applicable, the + * pointed content of this pointer will be set as the maximum possible + * number of groups that the collision-free hash table need to hold. + * @return A bool value indicating whether collision-free aggregation can be + * used to evaluate \p aggregate. + */ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate, const std::size_t estimated_num_groups, - std::size_t *exact_num_groups) const; + std::size_t *max_num_groups) const; /** * @brief Sets up the info of the CatalogRelation represented by TableReference. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 72beb60..c80c575 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -45,7 +45,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders( query_context->getAggregationState(aggr_state_index_); DCHECK(agg_state != nullptr); for (std::size_t part_id = 0; - part_id < agg_state->getNumPartitions(); + part_id < agg_state->getNumFinalizationPartitions(); ++part_id) { container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/relational_operators/InitializeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp index 3da719d..162f909 100644 --- a/relational_operators/InitializeAggregationOperator.cpp +++ b/relational_operators/InitializeAggregationOperator.cpp @@ -72,7 +72,7 @@ bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContain } void InitializeAggregationWorkOrder::execute() { - state_->initializeState(partition_id_); + state_->initialize(partition_id_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 007447f..0461b9e 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -197,8 +197,8 @@ AggregationOperationState::AggregationOperationState( } } - // Aggregation with GROUP BY: create a HashTable pool. if (!group_by_key_ids_.empty()) { + // Aggregation with GROUP BY: create the hash table (pool). if (is_aggregate_collision_free_) { collision_free_hashtable_.reset( AggregationStateHashTableFactory::CreateResizable( @@ -216,11 +216,12 @@ AggregationOperationState::AggregationOperationState( group_by_handles, storage_manager)); } else { - group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries, - hash_table_impl_type, - group_by_types_, - group_by_handles, - storage_manager)); + group_by_hashtable_pool_.reset( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types_, + group_by_handles, + storage_manager)); } } } @@ -352,36 +353,6 @@ bool AggregationOperationState::ProtoIsValid( return true; } -std::size_t AggregationOperationState::getNumPartitions() const { - if (is_aggregate_collision_free_) { - return static_cast<CollisionFreeVectorTable *>( - collision_free_hashtable_.get())->getNumFinalizationPartitions(); - } else if (is_aggregate_partitioned_) { - return partitioned_group_by_hashtable_pool_->getNumPartitions(); - } else { - return 1u; - } -} - -std::size_t AggregationOperationState::getNumInitializationPartitions() const { - if (is_aggregate_collision_free_) { - return static_cast<CollisionFreeVectorTable *>( - collision_free_hashtable_.get())->getNumInitializationPartitions(); - } else { - return 0u; - } -} - -void AggregationOperationState::initializeState(const std::size_t partition_id) { - if (is_aggregate_collision_free_) { - static_cast<CollisionFreeVectorTable *>( - collision_free_hashtable_.get())->initialize(partition_id); - } else { - LOG(FATAL) << "AggregationOperationState::initializeState() " - << "is not supported by this aggregation"; - } -} - bool AggregationOperationState::checkAggregatePartitioned( const std::size_t estimated_num_groups, const std::vector<bool> &is_distinct, @@ -404,7 +375,8 @@ bool AggregationOperationState::checkAggregatePartitioned( return false; } - // Currently we require that all the group-by keys are ScalarAttributes. + // Currently we require that all the group-by keys are ScalarAttributes for + // the convenient of implementing copy elision. // TODO(jianqiao): relax this requirement. for (const auto &group_by_element : group_by) { if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) { @@ -420,6 +392,36 @@ bool AggregationOperationState::checkAggregatePartitioned( return false; } +std::size_t AggregationOperationState::getNumInitializationPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast<CollisionFreeVectorTable *>( + collision_free_hashtable_.get())->getNumInitializationPartitions(); + } else { + return 0u; + } +} + +std::size_t AggregationOperationState::getNumFinalizationPartitions() const { + if (is_aggregate_collision_free_) { + return static_cast<CollisionFreeVectorTable *>( + collision_free_hashtable_.get())->getNumFinalizationPartitions(); + } else if (is_aggregate_partitioned_) { + return partitioned_group_by_hashtable_pool_->getNumPartitions(); + } else { + return 1u; + } +} + +void AggregationOperationState::initialize(const std::size_t partition_id) { + if (is_aggregate_collision_free_) { + static_cast<CollisionFreeVectorTable *>( + collision_free_hashtable_.get())->initialize(partition_id); + } else { + LOG(FATAL) << "AggregationOperationState::initializeState() " + << "is not supported by this aggregation"; + } +} + void AggregationOperationState::aggregateBlock(const block_id input_block, LIPFilterAdaptiveProber *lip_filter_adaptive_prober) { BlockReference block( @@ -521,7 +523,6 @@ void AggregationOperationState::mergeGroupByHashTables( void AggregationOperationState::aggregateBlockHashTable( const ValueAccessorMultiplexer &accessor_mux) { - // TODO if (is_aggregate_collision_free_) { aggregateBlockHashTableImplCollisionFree(accessor_mux); } else if (is_aggregate_partitioned_) { @@ -535,9 +536,9 @@ void AggregationOperationState::aggregateBlockHashTableImplCollisionFree( const ValueAccessorMultiplexer &accessor_mux) { DCHECK(collision_free_hashtable_ != nullptr); - collision_free_hashtable_->upsertValueAccessor(argument_ids_, - group_by_key_ids_, - accessor_mux); + collision_free_hashtable_->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); } void AggregationOperationState::aggregateBlockHashTableImplPartitioned( @@ -593,9 +594,9 @@ void AggregationOperationState::aggregateBlockHashTableImplPartitioned( ValueAccessorMultiplexer local_mux(base_adapter.get(), derived_adapter.get()); partitioned_group_by_hashtable_pool_->getHashTable(partition) - ->upsertValueAccessor(argument_ids_, - group_by_key_ids_, - local_mux); + ->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + local_mux); } }); } @@ -617,9 +618,9 @@ void AggregationOperationState::aggregateBlockHashTableImplThreadPrivate( AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pool_->getHashTable(); - agg_hash_table->upsertValueAccessor(argument_ids_, - group_by_key_ids_, - accessor_mux); + agg_hash_table->upsertValueAccessorCompositeKey(argument_ids_, + group_by_key_ids_, + accessor_mux); group_by_hashtable_pool_->returnHashTable(agg_hash_table); } @@ -674,9 +675,8 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree( CollisionFreeVectorTable *hash_table = static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get()); - // TODO const std::size_t max_length = - hash_table->getNumTuplesInPartition(partition_id); + hash_table->getNumTuplesInFinalizationPartition(partition_id); ColumnVectorsValueAccessor complete_result; DCHECK_EQ(1u, group_by_types_.size()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 783d7bc..27d7eb9 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -160,6 +160,29 @@ class AggregationOperationState { const CatalogDatabaseLite &database); /** + * @brief Get the number of partitions to be used for initializing the + * aggregation. + * + * @return The number of partitions to be used for initializing the aggregation. + **/ + std::size_t getNumInitializationPartitions() const; + + /** + * @brief Get the number of partitions to be used for finalizing the + * aggregation. + * + * @return The number of partitions to be used for finalizing the aggregation. + **/ + std::size_t getNumFinalizationPartitions() const; + + /** + * @brief Initialize the specified partition of this aggregation. + * + * @param partition_id ID of the partition to be initialized. + */ + void initialize(const std::size_t partition_id); + + /** * @brief Compute aggregates on the tuples of the given storage block, * updating the running state maintained by this * AggregationOperationState. @@ -183,17 +206,8 @@ class AggregationOperationState { void finalizeAggregate(const std::size_t partition_id, InsertDestination *output_destination); - /** - * @brief Get the number of partitions to be used for the aggregation. - * For non-partitioned aggregations, we return 1. - **/ - std::size_t getNumPartitions() const; - - std::size_t getNumInitializationPartitions() const; - - void initializeState(const std::size_t partition_id); - private: + // Check whether partitioned aggregation can be applied. bool checkAggregatePartitioned( const std::size_t estimated_num_groups, const std::vector<bool> &is_distinct, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp index 841db25..f2bc23b 100644 --- a/storage/CollisionFreeVectorTable.cpp +++ b/storage/CollisionFreeVectorTable.cpp @@ -39,23 +39,22 @@ namespace quickstep { CollisionFreeVectorTable::CollisionFreeVectorTable( - const std::vector<const Type *> &key_types, + const Type *key_type, const std::size_t num_entries, const std::vector<AggregationHandle *> &handles, StorageManager *storage_manager) - : key_type_(key_types.front()), + : key_type_(key_type), num_entries_(num_entries), num_handles_(handles.size()), handles_(handles), - num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)), + num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)), storage_manager_(storage_manager) { - CHECK_EQ(1u, key_types.size()); DCHECK_GT(num_entries, 0u); - std::map<std::string, std::size_t> memory_offsets; std::size_t required_memory = 0; + const std::size_t existence_map_offset = 0; + std::vector<std::size_t> state_offsets; - memory_offsets.emplace("existence_map", required_memory); required_memory += CacheLineAlignedBytes( BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries)); @@ -89,8 +88,7 @@ CollisionFreeVectorTable::CollisionFreeVectorTable( LOG(FATAL) << "Not implemented"; } - memory_offsets.emplace(std::string("state") + std::to_string(i), - required_memory); + state_offsets.emplace_back(required_memory); required_memory += CacheLineAlignedBytes(state_size * num_entries); } @@ -102,19 +100,18 @@ CollisionFreeVectorTable::CollisionFreeVectorTable( void *memory_start = blob_->getMemoryMutable(); existence_map_.reset(new BarrieredReadWriteConcurrentBitVector( - reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"), + reinterpret_cast<char *>(memory_start) + existence_map_offset, num_entries, false /* initialize */)); for (std::size_t i = 0; i < num_handles_; ++i) { + // Columnwise layout. vec_tables_.emplace_back( - reinterpret_cast<char *>(memory_start) + - memory_offsets.at(std::string("state") + std::to_string(i))); + reinterpret_cast<char *>(memory_start) + state_offsets.at(i)); } memory_size_ = required_memory; - num_init_partitions_ = - std::max(1uL, std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL)); + num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_); } CollisionFreeVectorTable::~CollisionFreeVectorTable() { @@ -126,7 +123,7 @@ CollisionFreeVectorTable::~CollisionFreeVectorTable() { void CollisionFreeVectorTable::destroyPayload() { } -bool CollisionFreeVectorTable::upsertValueAccessor( +bool CollisionFreeVectorTable::upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, const ValueAccessorMultiplexer &accessor_mux) { @@ -183,6 +180,7 @@ bool CollisionFreeVectorTable::upsertValueAccessor( is_argument_nullable = argument_type->isNullable(); } + // Dispatch to specialized implementations to achieve maximum performance. InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( base_accessor, [&](auto *accessor) -> void { // NOLINT(build/c++11) @@ -260,7 +258,7 @@ void CollisionFreeVectorTable::finalizeKey(const std::size_t partition_id, } void CollisionFreeVectorTable::finalizeState(const std::size_t partition_id, - std::size_t handle_id, + const std::size_t handle_id, NativeColumnVector *output_cv) const { const std::size_t start_position = calculatePartitionStartPosition(partition_id); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/CollisionFreeVectorTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp index cbe3b81..102b696 100644 --- a/storage/CollisionFreeVectorTable.hpp +++ b/storage/CollisionFreeVectorTable.hpp @@ -57,8 +57,17 @@ class StorageMnager; class CollisionFreeVectorTable : public AggregationStateHashTableBase { public: + /** + * @brief Constructor. + * + * @param key_type The group-by key type. + * @param num_entries The estimated number of entries this table will hold. + * @param handles The aggregation handles. + * @param storage_manager The StorageManager to use (a StorageBlob will be + * allocated to hold this table's contents). + **/ CollisionFreeVectorTable( - const std::vector<const Type *> &key_types, + const Type *key_type, const std::size_t num_entries, const std::vector<AggregationHandle *> &handles, StorageManager *storage_manager); @@ -67,15 +76,30 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { void destroyPayload() override; + /** + * @brief Get the number of partitions to be used for initializing the table. + * + * @return The number of partitions to be used for initializing the table. + */ inline std::size_t getNumInitializationPartitions() const { return num_init_partitions_; } + /** + * @brief Get the number of partitions to be used for finalizing the aggregation. + * + * @return The number of partitions to be used for finalizing the aggregation. + */ inline std::size_t getNumFinalizationPartitions() const { return num_finalize_partitions_; } - inline std::size_t getNumTuplesInPartition( + /** + * @brief Get the exact number of tuples in the specified finalization partition. + * + * @return The exact number of tuples in the specified finalization partition. + */ + inline std::size_t getNumTuplesInFinalizationPartition( const std::size_t partition_id) const { const std::size_t start_position = calculatePartitionStartPosition(partition_id); @@ -84,6 +108,11 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { return existence_map_->onesCountInRange(start_position, end_position); } + /** + * @brief Initialize the specified partition of this aggregation table. + * + * @param partition_id ID of the partition to be initialized. + */ inline void initialize(const std::size_t partition_id) { const std::size_t memory_segment_size = (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_; @@ -93,16 +122,46 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { std::min(memory_segment_size, memory_size_ - memory_start)); } - bool upsertValueAccessor( + /** + * @brief Use aggregation handles to update (multiple) aggregation states in + * this vector table, with group-by keys and arguments drawn from the + * given ValueAccessors. + * + * @param argument_ids The multi-source attribute IDs of each argument + * component to be read from \p accessor_mux. + * @param key_ids The multi-source attribute IDs of each group-by key + * component to be read from \p accessor_mux. + * @param accessor_mux A ValueAccessorMultiplexer object that contains the + * ValueAccessors which will be used to access keys. beginIteration() + * should be called on the accessors before calling this method. + * @return Always return true. + **/ + bool upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, const ValueAccessorMultiplexer &accessor_mux) override; + /** + * @brief Copy the keys from this table to a NativeColumnVector, for the + * specified partition. + * + * @param partition_id ID of the partition to copy keys from. + * @param output_cv The NativeColumnVector to copy keys to. + */ void finalizeKey(const std::size_t partition_id, NativeColumnVector *output_cv) const; + + /** + * @brief Finalize the aggregation states to a NativeColumnVector, for the + * specified partition and aggregation handle. + * + * @param partition_id ID of the partition to finalize. + * @param handle_id ID of the aggregation handle to finalize. + * @param output_cv The NativeColumnVector to write finalized values to. + */ void finalizeState(const std::size_t partition_id, - std::size_t handle_id, + const std::size_t handle_id, NativeColumnVector *output_cv) const; private: @@ -110,6 +169,28 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes; } + inline static std::size_t CalculateNumInitializationPartitions( + const std::size_t memory_size) { + // Set initialization memory block size as 4MB. + constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u; + + // At least 1 partition, at most 80 partitions. + // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the + // hardcoded 80. + return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL)); + } + + inline static std::size_t CalculateNumFinalizationPartitions( + const std::size_t num_entries) { + // Set finalization segment size as 4096 entries. + constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L; + + // At least 1 partition, at most 80 partitions. + // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the + // hardcoded 80. + return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL)); + } + inline std::size_t calculatePartitionLength() const { const std::size_t partition_length = (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_; @@ -214,79 +295,29 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase { template <typename KeyT> inline void finalizeKeyInternal(const std::size_t start_position, const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; - } - } + NativeColumnVector *output_cv) const; template <typename ...ArgTypes> inline void finalizeStateDispatchHelper(const AggregationID agg_id, const Type *argument_type, const void *vec_table, - ArgTypes &&...args) const { - switch (agg_id) { - case AggregationID::kCount: - finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case AggregationID::kSum: - finalizeStateSumHelper(argument_type, - vec_table, - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } - } + ArgTypes &&...args) const; template <typename ...ArgTypes> inline void finalizeStateSumHelper(const Type *argument_type, const void *vec_table, - ArgTypes &&...args) const { - DCHECK(argument_type != nullptr); - - switch (argument_type->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - finalizeStateSum<std::int64_t>( - static_cast<const std::atomic<std::int64_t> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - finalizeStateSum<double>( - static_cast<const std::atomic<double> *>(vec_table), - std::forward<ArgTypes>(args)...); - return; - default: - LOG(FATAL) << "Not supported"; - } - } + ArgTypes &&...args) const; inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, const std::size_t start_position, const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } - } + NativeColumnVector *output_cv) const; template <typename ResultT, typename StateT> inline void finalizeStateSum(const std::atomic<StateT> *vec_table, const std::size_t start_position, const std::size_t end_position, - NativeColumnVector *output_cv) const { - std::size_t loc = start_position - 1; - while ((loc = existence_map_->nextOne(loc)) < end_position) { - *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = - vec_table[loc].load(std::memory_order_relaxed); - } - } + NativeColumnVector *output_cv) const; const Type *key_type_; const std::size_t num_entries_; @@ -616,6 +647,88 @@ inline void CollisionFreeVectorTable } } +template <typename KeyT> +inline void CollisionFreeVectorTable + ::finalizeKeyInternal(const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + } +} + +template <typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::finalizeStateDispatchHelper(const AggregationID agg_id, + const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + switch (agg_id) { + case AggregationID::kCount: + finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case AggregationID::kSum: + finalizeStateSumHelper(argument_type, + vec_table, + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename ...ArgTypes> +inline void CollisionFreeVectorTable + ::finalizeStateSumHelper(const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + DCHECK(argument_type != nullptr); + + switch (argument_type->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + finalizeStateSum<std::int64_t>( + static_cast<const std::atomic<std::int64_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + finalizeStateSum<double>( + static_cast<const std::atomic<double> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +inline void CollisionFreeVectorTable + ::finalizeStateCount(const std::atomic<std::size_t> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } +} + +template <typename ResultT, typename StateT> +inline void CollisionFreeVectorTable + ::finalizeStateSum(const std::atomic<StateT> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } +} + } // namespace quickstep #endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/HashTableBase.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp index 064c7cb..b4b6918 100644 --- a/storage/HashTableBase.hpp +++ b/storage/HashTableBase.hpp @@ -78,12 +78,6 @@ class HashTableBase { public: virtual ~HashTableBase() {} - /** - * @brief Destroy the payload stored in the hash table. - **/ - virtual void destroyPayload() { - } - protected: HashTableBase() {} @@ -111,8 +105,10 @@ class AggregationStateHashTableBase { * * Optionally, we can also remove the AggregationStateHashTableBase * specialization from this file. + * + * TODO(jianqiao): Refractor the interface design for aggregation hash table. **/ - virtual bool upsertValueAccessor( + virtual bool upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_attr_ids, const ValueAccessorMultiplexer &accessor_mux) = 0; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index b88bf87..9686429 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -334,10 +334,28 @@ typedef HashTableFactory<TupleReference, true, false, false, true> JoinHashTableFactory; /** - * @brief TODO - */ + * @brief Factory class that makes it easier to instantiate aggregation state + * hash tables. + **/ class AggregationStateHashTableFactory { public: + /** + * @brief Create a new aggregation state hash table, with the type selected by + * hash_table_type. Other parameters are forwarded to the hash table's + * constructor. + * + * @param hash_table_type The specific hash table implementation that should + * be used. + * @param key_types A vector of one or more types (>1 indicates a composite + * key). Forwarded as-is to the hash table's constructor. + * @param num_entries The estimated number of entries the hash table will + * hold. Forwarded as-is to the hash table's constructor. + * @param storage_manager The StorageManager to use (a StorageBlob will be + * allocated to hold the hash table's contents). Forwarded as-is to the + * hash table constructor. + * @return A new aggregation state hash table. + **/ + static AggregationStateHashTableBase* CreateResizable( const HashTableImplType hash_table_type, const std::vector<const Type*> &key_types, @@ -349,11 +367,12 @@ class AggregationStateHashTableFactory { return new PackedPayloadHashTable( key_types, num_entries, handles, storage_manager); case HashTableImplType::kCollisionFreeVector: + DCHECK_EQ(1u, key_types.size()); return new CollisionFreeVectorTable( - key_types, num_entries, handles, storage_manager); + key_types.front(), num_entries, handles, storage_manager); default: { LOG(FATAL) << "Unrecognized HashTableImplType in " - << "AggregationStateHashTableFactory::createResizable()\n"; + << "AggregationStateHashTableFactory::createResizable()"; } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/PackedPayloadHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp index 06e7687..2875aa9 100644 --- a/storage/PackedPayloadHashTable.cpp +++ b/storage/PackedPayloadHashTable.cpp @@ -203,7 +203,7 @@ void PackedPayloadHashTable::destroyPayload() { } } -bool PackedPayloadHashTable::upsertValueAccessor( +bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_attr_ids, const ValueAccessorMultiplexer &accessor_mux) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/PackedPayloadHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp index 5e1b177..c5d5369 100644 --- a/storage/PackedPayloadHashTable.hpp +++ b/storage/PackedPayloadHashTable.hpp @@ -58,8 +58,30 @@ namespace quickstep { * @{ */ +/** + * @brief Aggregation hash table implementation in which the payload can be just + * a bunch of bytes. This implementation is suitable for aggregation with + * multiple aggregation handles (e.g. SUM, MAX, MIN etc). + * + * At present the hash table uses separate chaining to resolve collisions, i.e. + * Keys/values are stored in a separate region of memory from the base hash + * table slot array. Every bucket has a "next" pointer so that entries that + * collide (i.e. map to the same base slot) form chains of pointers with each + * other. + **/ class PackedPayloadHashTable : public AggregationStateHashTableBase { public: + /** + * @brief Constructor. + * + * @param key_types A vector of one or more types (>1 indicates a composite + * key). + * @param num_entries The estimated number of entries this hash table will + * hold. + * @param handles The aggregation handles. + * @param storage_manager The StorageManager to use (a StorageBlob will be + * allocated to hold this hash table's contents). + **/ PackedPayloadHashTable( const std::vector<const Type *> &key_types, const std::size_t num_entries, @@ -68,47 +90,228 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { ~PackedPayloadHashTable() override; + /** + * @brief Erase all entries in this hash table. + * + * @warning This method is not guaranteed to be threadsafe. + **/ void clear(); void destroyPayload() override; - bool upsertValueAccessor( + /** + * @brief Use aggregation handles to update (multiple) aggregation states in + * this hash table, with group-by keys and arguments drawn from the + * given ValueAccessors. New states are first inserted if not already + * present. + * + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param argument_ids The multi-source attribute IDs of each argument + * component to be read from \p accessor_mux. + * @param key_ids The multi-source attribute IDs of each group-by key + * component to be read from \p accessor_mux. + * @param accessor_mux A ValueAccessorMultiplexer object that contains the + * ValueAccessors which will be used to access keys. beginIteration() + * should be called on the accessors before calling this method. + * @return True on success, false if upsert failed because there was not + * enough space to insert new entries for all the keys in accessor + * (note that some entries may still have been upserted, and + * accessors' iterations will be left on the first tuple which could + * not be inserted). + **/ + bool upsertValueAccessorCompositeKey( const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, const std::vector<MultiSourceAttributeId> &key_ids, const ValueAccessorMultiplexer &accessor_mux) override; + /** + * @return The ID of the StorageBlob used to store this hash table. + **/ inline block_id getBlobId() const { return blob_->getID(); } + /** + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call). + * Concurrent calls to getSingleCompositeKey(), forEach(), and + * forEachCompositeKey() are safe. + * + * @return The number of entries in this HashTable. + **/ inline std::size_t numEntries() const { return header_->buckets_allocated.load(std::memory_order_relaxed); } + /** + * @brief Use aggregation handles to merge the given aggregation states into + * the aggregation states mapped to the given key. New states are first + * inserted if not already present. + * + * @warning The key must not be null. + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param key The key. + * @param source_state The source aggregation states to be merged into this + * hash table. + * @return True on success, false if upsert failed because there was not + * enough space to insert a new entry in this hash table. + **/ inline bool upsertCompositeKey(const std::vector<TypedValue> &key, const std::uint8_t *source_state); + /** + * @brief Apply a functor to an aggregation state mapped to the given key. + * First inserting a new state if one is not already present. + * + * @warning The key must not be null. + * @note This method is threadsafe with regard to other calls to + * upsertCompositeKey() and upsertValueAccessorCompositeKey(). + * + * @param key The key. + * @param functor A pointer to a functor, which should provide a call + * operator which takes an aggregation state (of type std::uint8_t *) + * as an argument. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return True on success, false if upsert failed because there was not + * enough space to insert a new entry in this hash table. + **/ template <typename FunctorT> inline bool upsertCompositeKey(const std::vector<TypedValue> &key, FunctorT *functor, - int index); + const std::size_t index); + /** + * @brief Lookup a composite key against this hash table to find a matching + * entry. + * + * @warning The key must not be null. + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param key The key to look up. + * @return The value of a matched entry if a matching key is found. + * Otherwise, return NULL. + **/ inline const std::uint8_t* getSingleCompositeKey( const std::vector<TypedValue> &key) const; + /** + * @brief Lookup a composite key against this hash table to find a matching + * entry. Then return the aggregation state component with the + * specified index. + * + * @warning The key must not be null. + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param key The key to look up. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The aggregation state of the specified index if a matching key is + * found. Otherwise, return NULL. + **/ inline const std::uint8_t* getSingleCompositeKey( const std::vector<TypedValue> &key, - const int index) const; + const std::size_t index) const; + /** + * @brief Apply a functor to each (key, value) pair in this hash table. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each key, value pair in + * this hash table. + * @return The number of key-value pairs visited. + **/ template <typename FunctorT> inline std::size_t forEach(FunctorT *functor) const; + /** + * @brief Apply a functor to each (key, aggregation state) pair in this hash + * table, where the aggregation state is retrieved from the value + * that maps to the corresponding key with the specified index. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each (key, aggregation state) + * pair in this hash table. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The number of key-value pairs visited. + **/ template <typename FunctorT> inline std::size_t forEach(FunctorT *functor, const int index) const; + /** + * @brief Apply a functor to each key, value pair in this hash table. + * Composite key version. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each key, value pair in + * this hash table. + * @return The number of key-value pairs visited. + **/ template <typename FunctorT> inline std::size_t forEachCompositeKey(FunctorT *functor) const; + /** + * @brief Apply a functor to each (key, aggregation state) pair in this hash + * table, where the aggregation state is retrieved from the value + * that maps to the corresponding key with the specified index. + * Composite key version. + * + * @warning This method assumes that no concurrent calls to + * upsertCompositeKey() or upsertValueAccessorCompositeKey() are + * taking place (i.e. that this HashTable is immutable for the + * duration of the call and as long as the returned pointer may be + * dereferenced). Concurrent calls to getSingleCompositeKey(), + * forEach(), and forEachCompositeKey() are safe. + * + * @param functor A pointer to a functor, which should provide a call operator + * which takes 2 arguments: const TypedValue&, const std::uint8_t*. + * The call operator will be invoked once on each (key, aggregation state) + * pair in this hash table. + * @param index The index of the target aggregation state among those states + * mapped to \p key. + * @return The number of key-value pairs visited. + **/ template <typename FunctorT> inline std::size_t forEachCompositeKey(FunctorT *functor, const std::size_t index) const; @@ -495,7 +698,7 @@ inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( const std::vector<TypedValue> &key, - const int index) const { + const std::size_t index) const { DEBUG_ASSERT(this->key_types_.size() == key.size()); const std::size_t hash_code = this->hashCompositeKey(key); @@ -549,7 +752,7 @@ template <typename FunctorT> inline bool PackedPayloadHashTable::upsertCompositeKey( const std::vector<TypedValue> &key, FunctorT *functor, - int index) { + const std::size_t index) { const std::size_t variable_size = calculateVariableLengthCompositeKeyCopySize(key); for (;;) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/dee650f6/storage/ValueAccessorMultiplexer.hpp ---------------------------------------------------------------------- diff --git a/storage/ValueAccessorMultiplexer.hpp b/storage/ValueAccessorMultiplexer.hpp index 0ed6175..94773a2 100644 --- a/storage/ValueAccessorMultiplexer.hpp +++ b/storage/ValueAccessorMultiplexer.hpp @@ -39,6 +39,10 @@ enum class ValueAccessorSource { kInvalid }; +/** + * @brief A data structure for representing attribute ids referring multiple + * ValueAccessors. + */ struct MultiSourceAttributeId { MultiSourceAttributeId(const ValueAccessorSource in_source, const attribute_id in_attr_id) @@ -53,21 +57,59 @@ struct MultiSourceAttributeId { const attribute_id attr_id; }; +/** + * @brief A class that encapsulates multiple ValueAccessors and provides helper + * methods for accessing the ValueAccessors with MultiSourceAttributeId. + * + * This class is in its very initial form that serves a small set of essential + * functionalities for the purpose of aggregation copy elision. That is, given a + * storage block to be aggregated on, we may have aggregations on a storage + * attribute (e.g. SUM(x)) or on a non-trivial expression (e.g. SUM(x * y)). + * For the former case, copy elision is applicable that the attribute gets accessed + * directly from the storage block. In the later case, we have to create a + * temporary data structure (i.e. ColumnVectorsValueAccessor) that stores the + * intermediate results. Thus, we refer to the ValueAccessor created directly + * from the storage block as the BASE accessor and the intermediate result + * ColumnVectorsValueAccessor as the DERIVED accessor. And we utilize this class + * (ValueAccessorMultiplexer) to pass both accessors around to enable copy elision. + * + * This class (together with ValueAccessorSource and MultiSourceAttributeId) + * may be rewritten or exteneded later to more generally support copy elisions + * in various scenarios. + */ class ValueAccessorMultiplexer { public: + /** + * @brief Constructor. + * + * @param base_accessor The base accessor. + * @param derived_accessor The derived accessor. + */ ValueAccessorMultiplexer(ValueAccessor *base_accessor, ValueAccessor *derived_accessor) : base_accessor_(base_accessor), derived_accessor_(derived_accessor) {} + /** + * @return The base accessor. + */ inline ValueAccessor* getBaseAccessor() const { return base_accessor_; } + /** + * @return The derived accessor. + */ inline ValueAccessor* getDerivedAccessor() const { return derived_accessor_; } + /** + * @brief Get the value accessor that corresponds to the specified source. + * + * @param source The value accessor source. + * @return The value accessor that corresponds to \p source. + */ inline ValueAccessor* getValueAccessorBySource( const ValueAccessorSource &source) const { switch (source) {