http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index 591e3a1..44803fc 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -33,7 +33,9 @@ #include "storage/HashTableBase.hpp" #include "storage/HashTablePool.hpp" #include "storage/PartitionedHashTablePool.hpp" +#include "storage/StorageBlock.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/ConcurrentBitVector.hpp" #include "utility/Macros.hpp" #include "gflags/gflags.h" @@ -43,9 +45,11 @@ namespace quickstep { class AggregateFunction; class CatalogDatabaseLite; class CatalogRelationSchema; +class ColumnVectorsValueAccessor; class InsertDestination; class LIPFilterAdaptiveProber; class StorageManager; +class TupleIdSequence; DECLARE_int32(num_aggregation_partitions); DECLARE_int32(partition_aggregation_num_groups_threshold); @@ -166,127 +170,99 @@ class AggregationOperationState { * the block. **/ void aggregateBlock(const block_id input_block, - LIPFilterAdaptiveProber *lip_filter_adaptive_prober); + LIPFilterAdaptiveProber *lip_filter_adaptive_prober = nullptr); /** * @brief Generate the final results for the aggregates managed by this * AggregationOperationState and write them out to StorageBlock(s). * + * @param partition_id The partition id of this finalize operation. * @param output_destination An InsertDestination where the finalized output * tuple(s) from this aggregate are to be written. **/ - void finalizeAggregate(InsertDestination *output_destination); - - /** - * @brief Destroy the payloads in the aggregation hash tables. - **/ - void destroyAggregationHashTablePayload(); - - /** - * @brief Generate the final results for the aggregates managed by this - * AggregationOperationState and write them out to StorageBlock(s). - * In this implementation, each thread picks a hash table belonging to - * a partition and writes its values to StorageBlock(s). There is no - * need to merge multiple hash tables in one, because there is no - * overlap in the keys across two hash tables. - * - * @param partition_id The ID of the partition for which finalize is being - * performed. - * @param output_destination An InsertDestination where the finalized output - * tuple(s) from this aggregate are to be written. - **/ - void finalizeAggregatePartitioned( - const std::size_t partition_id, InsertDestination *output_destination); - - static void mergeGroupByHashTables(AggregationStateHashTableBase *src, - AggregationStateHashTableBase *dst); - - bool isAggregatePartitioned() const { - return is_aggregate_partitioned_; - } + void finalizeAggregate(const std::size_t partition_id, + InsertDestination *output_destination); /** * @brief Get the number of partitions to be used for the aggregation. * For non-partitioned aggregations, we return 1. **/ - std::size_t getNumPartitions() const { - return is_aggregate_partitioned_ - ? partitioned_group_by_hashtable_pool_->getNumPartitions() - : 1; - } + std::size_t getNumPartitions() const; - int dflag; + std::size_t getNumInitializationPartitions() const; + + void initializeState(const std::size_t partition_id); private: - // Merge locally (per storage block) aggregated states with global aggregation - // states. - void mergeSingleState( - const std::vector<std::unique_ptr<AggregationState>> &local_state); + bool checkAggregatePartitioned( + const std::size_t estimated_num_groups, + const std::vector<bool> &is_distinct, + const std::vector<std::unique_ptr<const Scalar>> &group_by, + const std::vector<const AggregateFunction *> &aggregate_functions) const; // Aggregate on input block. void aggregateBlockSingleState(const block_id input_block); void aggregateBlockHashTable(const block_id input_block, LIPFilterAdaptiveProber *lip_filter_adaptive_prober); - void finalizeSingleState(InsertDestination *output_destination); - void finalizeHashTable(InsertDestination *output_destination); + // Merge locally (per storage block) aggregated states with global aggregation + // states. + void mergeSingleState( + const std::vector<std::unique_ptr<AggregationState>> &local_state); + void mergeGroupByHashTables(AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst) const; - bool checkAggregatePartitioned( - const std::size_t estimated_num_groups, - const std::vector<bool> &is_distinct, - const std::vector<std::unique_ptr<const Scalar>> &group_by, - const std::vector<const AggregateFunction *> &aggregate_functions) const { - // If there's no aggregation, return false. - if (aggregate_functions.empty()) { - return false; - } - // Check if there's a distinct operation involved in any aggregate, if so - // the aggregate can't be partitioned. - for (auto distinct : is_distinct) { - if (distinct) { - return false; - } - } - // There's no distinct aggregation involved, Check if there's at least one - // GROUP BY operation. - if (group_by.empty()) { - return false; - } - // There are GROUP BYs without DISTINCT. Check if the estimated number of - // groups is large enough to warrant a partitioned aggregation. - return estimated_num_groups > - static_cast<std::size_t>( - FLAGS_partition_aggregation_num_groups_threshold); - } + // Finalize the aggregation results into output_destination. + void finalizeSingleState(InsertDestination *output_destination); + void finalizeHashTable(const std::size_t partition_id, + InsertDestination *output_destination); + + // Specialized implementations for aggregateBlockHashTable. + void aggregateBlockHashTableImplCollisionFree(ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor); + void aggregateBlockHashTableImplPartitioned(ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor); + void aggregateBlockHashTableImplThreadPrivate(ValueAccessor *accessor, + ColumnVectorsValueAccessor *aux_accessor); + + // Specialized implementations for finalizeHashTable. + void finalizeHashTableImplCollisionFree(const std::size_t partition_id, + InsertDestination *output_destination); + void finalizeHashTableImplPartitioned(const std::size_t partition_id, + InsertDestination *output_destination); + void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination); // Common state for all aggregates in this operation: the input relation, the // filter predicate (if any), and the list of GROUP BY expressions (if any). const CatalogRelationSchema &input_relation_; + // Whether the aggregation is collision free or not. + bool is_aggregate_collision_free_; + // Whether the aggregation is partitioned or not. - const bool is_aggregate_partitioned_; + bool is_aggregate_partitioned_; std::unique_ptr<const Predicate> predicate_; - std::vector<std::unique_ptr<const Scalar>> group_by_list_; // Each individual aggregate in this operation has an AggregationHandle and - // some number of Scalar arguments. - std::vector<AggregationHandle *> handles_; - std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_; + // zero (indicated by -1) or one argument. + std::vector<std::unique_ptr<AggregationHandle>> handles_; // For each aggregate, whether DISTINCT should be applied to the aggregate's // arguments. std::vector<bool> is_distinct_; - // Hash table for obtaining distinct (i.e. unique) arguments. - std::vector<std::unique_ptr<AggregationStateHashTableBase>> - distinctify_hashtables_; + // Non-trivial group-by/argument expressions that need to be evaluated. + std::vector<std::unique_ptr<const Scalar>> non_trivial_expressions_; -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - // If all an aggregate's argument expressions are simply attributes in - // 'input_relation_', then this caches the attribute IDs of those arguments. - std::vector<std::vector<attribute_id>> arguments_as_attributes_; -#endif + std::vector<attribute_id> group_by_key_ids_; + std::vector<std::vector<attribute_id>> argument_ids_; + + std::vector<const Type *> group_by_types_; + + // Hash table for obtaining distinct (i.e. unique) arguments. +// std::vector<std::unique_ptr<AggregationStateHashTableBase>> +// distinctify_hashtables_; // Per-aggregate global states for aggregation without GROUP BY. std::vector<std::unique_ptr<AggregationState>> single_states_; @@ -303,6 +279,8 @@ class AggregationOperationState { std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_; + std::unique_ptr<AggregationStateHashTableBase> collision_free_hashtable_; + StorageManager *storage_manager_; DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index fddea1f..c7bc28f 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -165,6 +165,9 @@ if(QUICKSTEP_HAVE_BITWEAVING) bitweaving/BitWeavingVIndexSubBlock.hpp) endif() # CMAKE_VALIDATE_IGNORE_END +add_library(quickstep_storage_CollisionFreeAggregationStateHashTable + CollisionFreeAggregationStateHashTable.cpp + CollisionFreeAggregationStateHashTable.hpp) add_library(quickstep_storage_ColumnStoreUtil ColumnStoreUtil.cpp ColumnStoreUtil.hpp) add_library(quickstep_storage_CompressedBlockBuilder CompressedBlockBuilder.cpp CompressedBlockBuilder.hpp) add_library(quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -194,9 +197,6 @@ if (ENABLE_DISTRIBUTED) endif() add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp) -add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp) -add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp) -add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp) add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp) @@ -225,6 +225,9 @@ add_library(quickstep_storage_InsertDestination_proto add_library(quickstep_storage_LinearOpenAddressingHashTable ../empty_src.cpp LinearOpenAddressingHashTable.hpp) +add_library(quickstep_storage_PackedPayloadAggregationStateHashTable + PackedPayloadAggregationStateHashTable.cpp + PackedPayloadAggregationStateHashTable.hpp) add_library(quickstep_storage_PartitionedHashTablePool ../empty_src.cpp PartitionedHashTablePool.hpp) add_library(quickstep_storage_PreloaderThread PreloaderThread.cpp PreloaderThread.hpp) add_library(quickstep_storage_SMAIndexSubBlock SMAIndexSubBlock.cpp SMAIndexSubBlock.hpp) @@ -276,22 +279,25 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_storage_AggregationOperationState_proto - quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory quickstep_storage_HashTablePool quickstep_storage_InsertDestination quickstep_storage_PartitionedHashTablePool + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_storage_StorageBlock quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager + quickstep_storage_SubBlocksReference quickstep_storage_TupleIdSequence quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_types_containers_ColumnVectorsValueAccessor quickstep_types_containers_Tuple quickstep_utility_Macros + quickstep_utility_ConcurrentBitVector quickstep_utility_lipfilter_LIPFilterAdaptiveProber) target_link_libraries(quickstep_storage_AggregationOperationState_proto quickstep_expressions_Expressions_proto @@ -429,6 +435,24 @@ if(QUICKSTEP_HAVE_BITWEAVING) quickstep_utility_Macros) endif() # CMAKE_VALIDATE_IGNORE_END +target_link_libraries(quickstep_storage_CollisionFreeAggregationStateHashTable + quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationHandle + quickstep_expressions_aggregation_AggregationID + quickstep_storage_HashTableBase + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_types_TypeID + quickstep_types_TypedValue + quickstep_types_containers_ColumnVector + quickstep_types_containers_ColumnVectorsValueAccessor + quickstep_utility_ConcurrentBitVector + quickstep_utility_Macros) target_link_libraries(quickstep_storage_ColumnStoreUtil quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogRelationSchema @@ -626,52 +650,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy quickstep_threading_SpinMutex quickstep_threading_SpinSharedMutex quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastHashTable - quickstep_catalog_CatalogTypedefs - quickstep_storage_HashTableBase - quickstep_storage_StorageBlob - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_storage_StorageManager - quickstep_storage_TupleReference - quickstep_storage_ValueAccessor - quickstep_storage_ValueAccessorUtil - quickstep_threading_SpinMutex - quickstep_threading_SpinSharedMutex - quickstep_types_Type - quickstep_types_TypedValue - quickstep_utility_HashPair - quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastHashTableFactory - glog - quickstep_storage_FastHashTable - quickstep_storage_FastSeparateChainingHashTable - quickstep_storage_HashTable - quickstep_storage_HashTable_proto - quickstep_storage_HashTableBase - quickstep_storage_HashTableFactory - quickstep_storage_LinearOpenAddressingHashTable - quickstep_storage_SeparateChainingHashTable - quickstep_storage_SimpleScalarSeparateChainingHashTable - quickstep_storage_TupleReference - quickstep_types_TypeFactory - quickstep_utility_Macros) -target_link_libraries(quickstep_storage_FastSeparateChainingHashTable - quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_HashTable - quickstep_storage_HashTableBase - quickstep_storage_HashTableKeyManager - quickstep_storage_StorageBlob - quickstep_storage_StorageBlockInfo - quickstep_storage_StorageConstants - quickstep_storage_StorageManager - quickstep_threading_SpinSharedMutex - quickstep_types_Type - quickstep_types_TypedValue - quickstep_utility_Alignment - quickstep_utility_Macros - quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_FileManager quickstep_storage_StorageBlockInfo quickstep_utility_Macros @@ -734,10 +712,12 @@ target_link_libraries(quickstep_storage_HashTable_proto ${PROTOBUF_LIBRARY}) target_link_libraries(quickstep_storage_HashTableFactory glog + quickstep_storage_CollisionFreeAggregationStateHashTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase quickstep_storage_LinearOpenAddressingHashTable + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_storage_SeparateChainingHashTable quickstep_storage_SimpleScalarSeparateChainingHashTable quickstep_storage_TupleReference @@ -757,9 +737,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager target_link_libraries(quickstep_storage_HashTablePool glog quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory quickstep_threading_SpinMutex quickstep_utility_Macros quickstep_utility_StringUtil) @@ -817,12 +796,32 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable quickstep_utility_Alignment quickstep_utility_Macros quickstep_utility_PrimeNumber) +target_link_libraries(quickstep_storage_PackedPayloadAggregationStateHashTable + quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_HashTableBase + quickstep_storage_HashTableKeyManager + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_TupleReference + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_threading_SpinMutex + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_types_containers_ColumnVectorsValueAccessor + quickstep_utility_Alignment + quickstep_utility_HashPair + quickstep_utility_Macros + quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_PartitionedHashTablePool glog quickstep_expressions_aggregation_AggregationHandle - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory quickstep_utility_Macros quickstep_utility_StringUtil) target_link_libraries(quickstep_storage_PreloaderThread @@ -933,7 +932,6 @@ target_link_libraries(quickstep_storage_StorageBlock glog quickstep_catalog_CatalogRelationSchema quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_storage_BasicColumnStoreTupleStorageSubBlock @@ -942,7 +940,6 @@ target_link_libraries(quickstep_storage_StorageBlock quickstep_storage_CompressedColumnStoreTupleStorageSubBlock quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock quickstep_storage_CountedReference - quickstep_storage_HashTableBase quickstep_storage_IndexSubBlock quickstep_storage_InsertDestinationInterface quickstep_storage_SMAIndexSubBlock @@ -1111,6 +1108,7 @@ target_link_libraries(quickstep_storage quickstep_storage_BasicColumnStoreValueAccessor quickstep_storage_BloomFilterIndexSubBlock quickstep_storage_CSBTreeIndexSubBlock + quickstep_storage_CollisionFreeAggregationStateHashTable quickstep_storage_ColumnStoreUtil quickstep_storage_CompressedBlockBuilder quickstep_storage_CompressedColumnStoreTupleStorageSubBlock @@ -1123,9 +1121,6 @@ target_link_libraries(quickstep_storage quickstep_storage_EvictionPolicy quickstep_storage_FileManager quickstep_storage_FileManagerLocal - quickstep_storage_FastHashTable - quickstep_storage_FastHashTableFactory - quickstep_storage_FastSeparateChainingHashTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase @@ -1139,6 +1134,7 @@ target_link_libraries(quickstep_storage quickstep_storage_InsertDestination_proto quickstep_storage_LinearOpenAddressingHashTable quickstep_storage_PartitionedHashTablePool + quickstep_storage_PackedPayloadAggregationStateHashTable quickstep_storage_PreloaderThread quickstep_storage_SMAIndexSubBlock quickstep_storage_SeparateChainingHashTable http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CollisionFreeAggregationStateHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeAggregationStateHashTable.cpp b/storage/CollisionFreeAggregationStateHashTable.cpp new file mode 100644 index 0000000..15d4dfe --- /dev/null +++ b/storage/CollisionFreeAggregationStateHashTable.cpp @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "storage/CollisionFreeAggregationStateHashTable.hpp" + +#include <algorithm> +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstdlib> +#include <map> +#include <memory> +#include <vector> + +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" + +namespace quickstep { + +CollisionFreeAggregationStateHashTable::CollisionFreeAggregationStateHashTable( + const std::vector<const Type *> &key_types, + const std::size_t num_entries, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager) + : key_type_(key_types.front()), + num_entries_(num_entries), + num_handles_(handles.size()), + handles_(handles), + num_finalize_partitions_(std::min((num_entries_ >> 12u) + 1u, 80uL)), + storage_manager_(storage_manager) { + CHECK_EQ(1u, key_types.size()); + DCHECK_GT(num_entries, 0u); + + std::map<std::string, std::size_t> memory_offsets; + std::size_t required_memory = 0; + + memory_offsets.emplace("existence_map", required_memory); + required_memory += + CacheLineAlignedBytes(ConcurrentBitVector::BytesNeeded(num_entries)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + const AggregationHandle *handle = handles_[i]; + const std::vector<const Type *> argument_types = handle->getArgumentTypes(); + + std::size_t state_size = 0; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + state_size = sizeof(std::atomic<std::size_t>); + break; + } + case AggregationID::kSum: { + CHECK_EQ(1u, argument_types.size()); + switch (argument_types.front()->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + state_size = sizeof(std::atomic<std::int64_t>); + break; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + state_size = sizeof(std::atomic<double>); + break; + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + + memory_offsets.emplace(std::string("state") + std::to_string(i), + required_memory); + required_memory += CacheLineAlignedBytes(state_size * num_entries); + } + + const std::size_t num_storage_slots = + storage_manager_->SlotsNeededForBytes(required_memory); + + const block_id blob_id = storage_manager_->createBlob(num_storage_slots); + blob_ = storage_manager_->getBlobMutable(blob_id); + + void *memory_start = blob_->getMemoryMutable(); + existence_map_.reset(new ConcurrentBitVector( + reinterpret_cast<char *>(memory_start) + memory_offsets.at("existence_map"), + num_entries, + false /* initialize */)); + + for (std::size_t i = 0; i < num_handles_; ++i) { + vec_tables_.emplace_back( + reinterpret_cast<char *>(memory_start) + + memory_offsets.at(std::string("state") + std::to_string(i))); + } + + memory_size_ = required_memory; + num_init_partitions_ = std::min(memory_size_ / (4uL * 1024u * 1024u), 80uL); +} + +CollisionFreeAggregationStateHashTable::~CollisionFreeAggregationStateHashTable() { + const block_id blob_id = blob_->getID(); + blob_.release(); + storage_manager_->deleteBlockOrBlobFile(blob_id); +} + +void CollisionFreeAggregationStateHashTable::destroyPayload() { +} + +bool CollisionFreeAggregationStateHashTable::upsertValueAccessor( + const std::vector<std::vector<attribute_id>> &argument_ids, + const std::vector<attribute_id> &key_attr_ids, + ValueAccessor *base_accessor, + ColumnVectorsValueAccessor *aux_accessor) { + DCHECK_EQ(1u, key_attr_ids.size()); + + const attribute_id key_attr_id = key_attr_ids.front(); + const bool is_key_nullable = key_type_->isNullable(); + + for (std::size_t i = 0; i < num_handles_; ++i) { + DCHECK_LE(argument_ids[i].size(), 1u); + + const attribute_id argument_id = + argument_ids[i].empty() ? kInvalidAttributeID : argument_ids[i].front(); + + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + + const Type *argument_type; + bool is_argument_nullable; + if (argument_types.empty()) { + argument_type = nullptr; + is_argument_nullable = false; + } else { + argument_type = argument_types.front(); + is_argument_nullable = argument_type->isNullable(); + } + + InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( + base_accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + if (key_attr_id >= 0) { + if (argument_id >= 0) { + upsertValueAccessorDispatchHelper<false>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_attr_id, + argument_id, + vec_tables_[i], + accessor, + accessor); + } else { + upsertValueAccessorDispatchHelper<true>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + key_attr_id, + -(argument_id+2), + vec_tables_[i], + accessor, + aux_accessor); + } + } else { + if (argument_id >= 0) { + upsertValueAccessorDispatchHelper<true>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + -(key_attr_id+2), + argument_id, + vec_tables_[i], + aux_accessor, + accessor); + } else { + upsertValueAccessorDispatchHelper<false>(is_key_nullable, + is_argument_nullable, + key_type_, + argument_type, + handle->getAggregationID(), + -(key_attr_id+2), + -(argument_id+2), + vec_tables_[i], + aux_accessor, + aux_accessor); + } + } + }); + } + return true; +} + +void CollisionFreeAggregationStateHashTable::finalizeKey( + const std::size_t partition_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + switch (key_type_->getTypeID()) { + case TypeID::kInt: + finalizeKeyInternal<int>(start_position, end_position, output_cv); + return; + case TypeID::kLong: + finalizeKeyInternal<std::int64_t>(start_position, end_position, output_cv); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +void CollisionFreeAggregationStateHashTable::finalizeState( + const std::size_t partition_id, + std::size_t handle_id, + NativeColumnVector *output_cv) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + const AggregationHandle *handle = handles_[handle_id]; + const auto &argument_types = handle->getArgumentTypes(); + const Type *argument_type = + argument_types.empty() ? nullptr : argument_types.front(); + + finalizeStateDispatchHelper(handle->getAggregationID(), + argument_type, + vec_tables_[handle_id], + start_position, + end_position, + output_cv); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/34ea858d/storage/CollisionFreeAggregationStateHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeAggregationStateHashTable.hpp b/storage/CollisionFreeAggregationStateHashTable.hpp new file mode 100644 index 0000000..f3edfd8 --- /dev/null +++ b/storage/CollisionFreeAggregationStateHashTable.hpp @@ -0,0 +1,568 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ +#define QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_ + +#include <atomic> +#include <cstddef> +#include <cstdint> +#include <cstring> +#include <memory> +#include <type_traits> +#include <utility> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/StorageBlob.hpp" +#include "storage/StorageConstants.hpp" +#include "storage/ValueAccessor.hpp" +#include "types/Type.hpp" +#include "types/TypeID.hpp" +#include "types/TypedValue.hpp" +#include "types/containers/ColumnVector.hpp" +#include "utility/ConcurrentBitVector.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class ColumnVectorsValueAccessor; +class StorageMnager; + +/** \addtogroup Storage + * @{ + */ + +class CollisionFreeAggregationStateHashTable : public AggregationStateHashTableBase { + public: + CollisionFreeAggregationStateHashTable( + const std::vector<const Type *> &key_types, + const std::size_t num_entries, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager); + + ~CollisionFreeAggregationStateHashTable() override; + + void destroyPayload() override; + + inline std::size_t getNumInitializationPartitions() const { + return num_init_partitions_; + } + + inline std::size_t getNumFinalizationPartitions() const { + return num_finalize_partitions_; + } + + inline std::size_t getNumTuplesInPartition( + const std::size_t partition_id) const { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + return existence_map_->onesCount(start_position, end_position); + } + + inline void initialize(const std::size_t partition_id) { + const std::size_t memory_segment_size = + (memory_size_ + num_init_partitions_ - 1) / num_init_partitions_; + const std::size_t memory_start = memory_segment_size * partition_id; + std::memset(reinterpret_cast<char *>(blob_->getMemoryMutable()) + memory_start, + 0, + std::min(memory_segment_size, memory_size_ - memory_start)); + } + + bool upsertValueAccessor( + const std::vector<std::vector<attribute_id>> &argument_ids, + const std::vector<attribute_id> &key_attr_ids, + ValueAccessor *base_accessor, + ColumnVectorsValueAccessor *aux_accessor = nullptr) override; + + void finalizeKey(const std::size_t partition_id, + NativeColumnVector *output_cv) const; + + void finalizeState(const std::size_t partition_id, + std::size_t handle_id, + NativeColumnVector *output_cv) const; + + private: + inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) { + return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes; + } + + inline std::size_t calculatePartitionLength() const { + const std::size_t partition_length = + (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_; + DCHECK_GE(partition_length, 0u); + return partition_length; + } + + inline std::size_t calculatePartitionStartPosition( + const std::size_t partition_id) const { + return calculatePartitionLength() * partition_id; + } + + inline std::size_t calculatePartitionEndPosition( + const std::size_t partition_id) const { + return std::min(calculatePartitionLength() * (partition_id + 1), + num_entries_); + } + + template <bool use_two_accessors, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const bool is_key_nullable, + const bool is_argument_nullable, + ArgTypes &&...args); + + template <bool ...bool_values, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const Type *key_type, + ArgTypes &&...args); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ...ArgTypes> + inline void upsertValueAccessorDispatchHelper( + const Type *argument_type, + const AggregationID agg_id, + ArgTypes &&...args); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorCountHelper( + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorSumHelper( + const Type *argument_type, + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool is_key_nullable, typename KeyT, typename KeyValueAccessorT> + inline void upsertValueAccessorCountNullary( + const attribute_id key_attr_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor); + + template <bool use_two_accessors, bool is_key_nullable, typename KeyT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorCountUnary( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorIntegerSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> + inline void upsertValueAccessorGenericSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor); + + template <typename KeyT> + inline void finalizeKeyInternal(const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<KeyT *>(output_cv->getPtrForDirectWrite()) = loc; + } + } + + template <typename ...ArgTypes> + inline void finalizeStateDispatchHelper(const AggregationID agg_id, + const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + switch (agg_id) { + case AggregationID::kCount: + finalizeStateCount(static_cast<const std::atomic<std::size_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case AggregationID::kSum: + finalizeStateSumHelper(argument_type, + vec_table, + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } + } + + template <typename ...ArgTypes> + inline void finalizeStateSumHelper(const Type *argument_type, + const void *vec_table, + ArgTypes &&...args) const { + DCHECK(argument_type != nullptr); + + switch (argument_type->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + finalizeStateSum<std::int64_t>( + static_cast<const std::atomic<std::int64_t> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + finalizeStateSum<double>( + static_cast<const std::atomic<double> *>(vec_table), + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } + } + + inline void finalizeStateCount(const std::atomic<std::size_t> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<std::int64_t *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } + } + + template <typename ResultT, typename StateT> + inline void finalizeStateSum(const std::atomic<StateT> *vec_table, + const std::size_t start_position, + const std::size_t end_position, + NativeColumnVector *output_cv) const { + std::size_t loc = start_position - 1; + while ((loc = existence_map_->nextOne(loc)) < end_position) { + *static_cast<ResultT *>(output_cv->getPtrForDirectWrite()) = + vec_table[loc].load(std::memory_order_relaxed); + } + } + + const Type *key_type_; + const std::size_t num_entries_; + + const std::size_t num_handles_; + const std::vector<AggregationHandle *> handles_; + + std::unique_ptr<ConcurrentBitVector> existence_map_; + std::vector<void *> vec_tables_; + + const std::size_t num_finalize_partitions_; + + StorageManager *storage_manager_; + MutableBlobReference blob_; + + std::size_t memory_size_; + std::size_t num_init_partitions_; + + DISALLOW_COPY_AND_ASSIGN(CollisionFreeAggregationStateHashTable); +}; + +// ---------------------------------------------------------------------------- +// Implementations of template methods follow. + +template <bool use_two_accessors, typename ...ArgTypes> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorDispatchHelper( + const bool is_key_nullable, + const bool is_argument_nullable, + ArgTypes &&...args) { + if (is_key_nullable) { + if (is_argument_nullable) { + upsertValueAccessorDispatchHelper<use_two_accessors, true, true>( + std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorDispatchHelper<use_two_accessors, true, false>( + std::forward<ArgTypes>(args)...); + } + } else { + if (is_argument_nullable) { + upsertValueAccessorDispatchHelper<use_two_accessors, false, true>( + std::forward<ArgTypes>(args)...); + } else { + upsertValueAccessorDispatchHelper<use_two_accessors, false, false>( + std::forward<ArgTypes>(args)...); + } + } +} + +template <bool ...bool_values, typename ...ArgTypes> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorDispatchHelper( + const Type *key_type, + ArgTypes &&...args) { + switch (key_type->getTypeID()) { + case TypeID::kInt: + upsertValueAccessorDispatchHelper<bool_values..., int>( + std::forward<ArgTypes>(args)...); + return; + case TypeID::kLong: + upsertValueAccessorDispatchHelper<bool_values..., std::int64_t>( + std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ...ArgTypes> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorDispatchHelper( + const Type *argument_type, + const AggregationID agg_id, + ArgTypes &&...args) { + switch (agg_id) { + case AggregationID::kCount: + upsertValueAccessorCountHelper< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( + std::forward<ArgTypes>(args)...); + return; + case AggregationID::kSum: + upsertValueAccessorSumHelper< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT>( + argument_type, std::forward<ArgTypes>(args)...); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorCountHelper( + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + DCHECK_GE(key_attr_id, 0u); + + if (is_argument_nullable && argument_id != kInvalidAttributeID) { + upsertValueAccessorCountUnary<use_two_accessors, is_key_nullable, KeyT>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::size_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + } else { + upsertValueAccessorCountNullary<is_key_nullable, KeyT>( + key_attr_id, + static_cast<std::atomic<std::size_t> *>(vec_table), + key_accessor); + return; + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorSumHelper( + const Type *argument_type, + const attribute_id key_attr_id, + const attribute_id argument_id, + void *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + DCHECK_GE(key_attr_id, 0u); + DCHECK_GE(argument_id, 0u); + DCHECK(argument_type != nullptr); + + switch (argument_type->getTypeID()) { + case TypeID::kInt: + upsertValueAccessorIntegerSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, int>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::int64_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kLong: + upsertValueAccessorIntegerSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, std::int64_t>( + key_attr_id, + argument_id, + static_cast<std::atomic<std::int64_t> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kFloat: + upsertValueAccessorGenericSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, float>( + key_attr_id, + argument_id, + static_cast<std::atomic<double> *>(vec_table), + key_accessor, + argument_accessor); + return; + case TypeID::kDouble: + upsertValueAccessorGenericSum< + use_two_accessors, is_key_nullable, is_argument_nullable, KeyT, double>( + key_attr_id, + argument_id, + static_cast<std::atomic<double> *>(vec_table), + key_accessor, + argument_accessor); + return; + default: + LOG(FATAL) << "Not supported"; + } +} + +template <bool is_key_nullable, typename KeyT, typename ValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorCountNullary( + const attribute_id key_attr_id, + std::atomic<std::size_t> *vec_table, + ValueAccessorT *accessor) { + accessor->beginIteration(); + while (accessor->next()) { + const KeyT *key = static_cast<const KeyT *>( + accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + vec_table[loc].fetch_add(1u, std::memory_order_relaxed); + existence_map_->setBit(loc); + } +} + +template <bool use_two_accessors, bool is_key_nullable, typename KeyT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorCountUnary( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<std::size_t> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + if (argument_accessor->getUntypedValue(argument_id) == nullptr) { + continue; + } + vec_table[loc].fetch_add(1u, std::memory_order_relaxed); + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorIntegerSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + const ArgumentT *argument = static_cast<const ArgumentT *>( + argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); + if (is_argument_nullable && argument == nullptr) { + continue; + } + vec_table[loc].fetch_add(*argument, std::memory_order_relaxed); + } +} + +template <bool use_two_accessors, bool is_key_nullable, bool is_argument_nullable, + typename KeyT, typename ArgumentT, typename StateT, + typename KeyValueAccessorT, typename ArgumentValueAccessorT> +inline void CollisionFreeAggregationStateHashTable + ::upsertValueAccessorGenericSum( + const attribute_id key_attr_id, + const attribute_id argument_id, + std::atomic<StateT> *vec_table, + KeyValueAccessorT *key_accessor, + ArgumentValueAccessorT *argument_accessor) { + key_accessor->beginIteration(); + while (key_accessor->next()) { + if (use_two_accessors) { + argument_accessor->next(); + } + const KeyT *key = static_cast<const KeyT *>( + key_accessor->template getUntypedValue<is_key_nullable>(key_attr_id)); + if (is_key_nullable && key == nullptr) { + continue; + } + const std::size_t loc = *key; + existence_map_->setBit(loc); + const ArgumentT *argument = static_cast<const ArgumentT *>( + argument_accessor->template getUntypedValue<is_argument_nullable>(argument_id)); + if (is_argument_nullable && argument == nullptr) { + continue; + } + const ArgumentT arg_val = *argument; + std::atomic<StateT> &state = vec_table[loc]; + StateT state_val = state.load(std::memory_order_relaxed); + while(!state.compare_exchange_weak(state_val, state_val + arg_val)) {} + } +} + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_COLLISION_FREE_AGGREGATION_STATE_HASH_TABLE_HPP_