- Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key. - Supports copy elision for aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2d89e4fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2d89e4fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2d89e4fb Branch: refs/heads/reorder-partitioned-hash-join Commit: 2d89e4fbf3b51d7768d928d56b25ec0ab52faabb Parents: ec76096 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Mon Jan 30 14:46:39 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Tue Feb 7 15:29:19 2017 -0600 ---------------------------------------------------------------------- .../aggregation/AggregateFunctionCount.cpp | 6 +- .../aggregation/AggregationConcreteHandle.cpp | 32 +- .../aggregation/AggregationConcreteHandle.hpp | 160 +- expressions/aggregation/AggregationHandle.hpp | 193 +- .../aggregation/AggregationHandleAvg.cpp | 95 +- .../aggregation/AggregationHandleAvg.hpp | 115 +- .../aggregation/AggregationHandleCount.cpp | 147 +- .../aggregation/AggregationHandleCount.hpp | 154 +- .../aggregation/AggregationHandleDistinct.cpp | 81 - .../aggregation/AggregationHandleDistinct.hpp | 130 - .../aggregation/AggregationHandleMax.cpp | 91 +- .../aggregation/AggregationHandleMax.hpp | 111 +- .../aggregation/AggregationHandleMin.cpp | 91 +- .../aggregation/AggregationHandleMin.hpp | 120 +- .../aggregation/AggregationHandleSum.cpp | 93 +- .../aggregation/AggregationHandleSum.hpp | 114 +- expressions/aggregation/CMakeLists.txt | 58 +- .../tests/AggregationHandleAvg_unittest.cpp | 110 +- .../tests/AggregationHandleCount_unittest.cpp | 145 +- .../tests/AggregationHandleMax_unittest.cpp | 158 +- .../tests/AggregationHandleMin_unittest.cpp | 158 +- .../tests/AggregationHandleSum_unittest.cpp | 109 +- query_execution/QueryContext.hpp | 14 - query_optimizer/CMakeLists.txt | 4 + query_optimizer/ExecutionGenerator.cpp | 147 +- query_optimizer/ExecutionGenerator.hpp | 20 +- relational_operators/CMakeLists.txt | 15 + .../DestroyAggregationStateOperator.cpp | 7 - .../FinalizeAggregationOperator.cpp | 16 +- .../FinalizeAggregationOperator.hpp | 14 +- .../InitializeAggregationOperator.cpp | 72 + .../InitializeAggregationOperator.hpp | 122 + relational_operators/WorkOrderFactory.cpp | 3 + storage/AggregationOperationState.cpp | 834 +++--- storage/AggregationOperationState.hpp | 178 +- storage/CMakeLists.txt | 131 +- storage/CollisionFreeVectorTable.cpp | 285 +++ storage/CollisionFreeVectorTable.hpp | 730 ++++++ storage/FastHashTable.hpp | 2403 ------------------ storage/FastHashTableFactory.hpp | 224 -- storage/FastSeparateChainingHashTable.hpp | 1551 ----------- storage/HashTable.proto | 7 +- storage/HashTableBase.hpp | 42 +- storage/HashTableFactory.hpp | 63 +- storage/HashTablePool.hpp | 79 +- storage/PackedPayloadHashTable.cpp | 463 ++++ storage/PackedPayloadHashTable.hpp | 995 ++++++++ storage/PartitionedHashTablePool.hpp | 56 +- storage/StorageBlock.cpp | 274 +- storage/StorageBlock.hpp | 167 -- storage/ValueAccessorMultiplexer.hpp | 145 ++ .../BarrieredReadWriteConcurrentBitVector.hpp | 282 ++ utility/CMakeLists.txt | 7 + 53 files changed, 4722 insertions(+), 7099 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregateFunctionCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregateFunctionCount.cpp b/expressions/aggregation/AggregateFunctionCount.cpp index 466ff2f..9795b4a 100644 --- a/expressions/aggregation/AggregateFunctionCount.cpp +++ b/expressions/aggregation/AggregateFunctionCount.cpp @@ -53,16 +53,16 @@ AggregationHandle* AggregateFunctionCount::createHandle( if (argument_types.empty()) { // COUNT(*) - return new AggregationHandleCount<true, false>(); + return new AggregationHandleCount<true, false>(nullptr); } else if (argument_types.front()->isNullable()) { // COUNT(some_nullable_argument) - return new AggregationHandleCount<false, true>(); + return new AggregationHandleCount<false, true>(argument_types.front()); } else { // COUNT(non_nullable_argument) // // TODO(chasseur): Modify query optimizer to optimize-away COUNT with a // non-nullable argument and convert it to COUNT(*). - return new AggregationHandleCount<false, false>(); + return new AggregationHandleCount<false, false>(argument_types.front()); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index e3fb520..fa21056 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -22,16 +22,14 @@ #include <cstddef> #include <vector> -#include "catalog/CatalogTypedefs.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTable.hpp" #include "storage/HashTableFactory.hpp" +#include "storage/PackedPayloadHashTable.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" namespace quickstep { class StorageManager; class Type; -class ValueAccessor; AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable( const HashTableImplType hash_table_impl, @@ -39,30 +37,26 @@ AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashT const std::size_t estimated_num_distinct_keys, StorageManager *storage_manager) const { // Create a hash table with key types as key_types and value type as bool. - return AggregationStateHashTableFactory<bool>::CreateResizable( + return AggregationStateHashTableFactory::CreateResizable( hash_table_impl, key_types, estimated_num_distinct_keys, + {}, storage_manager); } void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, + const std::vector<MultiSourceAttributeId> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_ids, + const ValueAccessorMultiplexer &accessor_mux, AggregationStateHashTableBase *distinctify_hash_table) const { - // If the key-value pair is already there, we don't need to update the value, - // which should always be "true". I.e. the value is just a placeholder. - - AggregationStateFastHashTable *hash_table = - static_cast<AggregationStateFastHashTable *>(distinctify_hash_table); - if (key_ids.size() == 1) { - hash_table->upsertValueAccessorFast( - key_ids, accessor, key_ids[0], true /* check_for_null_keys */); - } else { - std::vector<attribute_id> empty_args {kInvalidAttributeID}; - hash_table->upsertValueAccessorCompositeKeyFast( - empty_args, accessor, key_ids, true /* check_for_null_keys */); + std::vector<MultiSourceAttributeId> concatenated_ids(key_ids); + for (const MultiSourceAttributeId &arg_id : argument_ids) { + concatenated_ids.emplace_back(arg_id); } + + static_cast<PackedPayloadHashTable *>(distinctify_hash_table) + ->upsertValueAccessorCompositeKey({}, concatenated_ids, accessor_mux); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index 398a032..2287ca1 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -21,14 +21,15 @@ #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_ #include <cstddef> +#include <cstdint> #include <utility> #include <vector> -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTable.hpp" +#include "expressions/aggregation/AggregationID.hpp" #include "storage/HashTableBase.hpp" +#include "storage/PackedPayloadHashTable.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -40,7 +41,6 @@ namespace quickstep { class StorageManager; class Type; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -51,7 +51,7 @@ class ValueAccessor; * merging two group by hash tables. **/ template <typename HandleT> -class HashTableStateUpserterFast { +class HashTableStateUpserter { public: /** * @brief Constructor. @@ -61,8 +61,8 @@ class HashTableStateUpserterFast { * table. The corresponding state (for the same key) in the destination * hash table will be upserted. **/ - HashTableStateUpserterFast(const HandleT &handle, - const std::uint8_t *source_state) + HashTableStateUpserter(const HandleT &handle, + const std::uint8_t *source_state) : handle_(handle), source_state_(source_state) {} /** @@ -72,14 +72,14 @@ class HashTableStateUpserterFast { * table that is being upserted. **/ void operator()(std::uint8_t *destination_state) { - handle_.mergeStatesFast(source_state_, destination_state); + handle_.mergeStates(source_state_, destination_state); } private: const HandleT &handle_; const std::uint8_t *source_state_; - DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast); + DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter); }; /** @@ -93,74 +93,62 @@ class HashTableStateUpserterFast { **/ class AggregationConcreteHandle : public AggregationHandle { public: - /** - * @brief Default implementaion for AggregationHandle::accumulateNullary(). - */ - AggregationState* accumulateNullary( - const std::size_t num_tuples) const override { - LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that " - << "takes at least one argument."; - } - - /** - * @brief Implementaion for AggregationHandle::createDistinctifyHashTable() - * that creates a new HashTable for the distinctify step for - * DISTINCT aggregation. - */ AggregationStateHashTableBase* createDistinctifyHashTable( const HashTableImplType hash_table_impl, const std::vector<const Type *> &key_types, const std::size_t estimated_num_distinct_keys, StorageManager *storage_manager) const override; - /** - * @brief Implementaion for - * AggregationHandle::insertValueAccessorIntoDistinctifyHashTable() - * that inserts the GROUP BY expressions and aggregation arguments together - * as keys into the distinctify hash table. - */ void insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, + const std::vector<MultiSourceAttributeId> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_ids, + const ValueAccessorMultiplexer &accessor_mux, AggregationStateHashTableBase *distinctify_hash_table) const override; + void blockUpdate() override { + block_update_ = true; + } + + void allowUpdate() override { + block_update_ = false; + } + protected: - AggregationConcreteHandle() {} + explicit AggregationConcreteHandle(const AggregationID agg_id) + : AggregationHandle(agg_id), + block_update_(false) {} template <typename HandleT, typename StateT> - StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table) const; - template <typename HandleT, typename HashTableT> - void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + template <typename HandleT> + void aggregateOnDistinctifyHashTableForGroupByUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *hash_table, - std::size_t index) const; + const std::size_t index, + AggregationStateHashTableBase *hash_table) const; - template <typename HandleT, typename HashTableT> - ColumnVector* finalizeHashTableHelperFast( + template <typename HandleT> + ColumnVector* finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const; + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const; - template <typename HandleT, typename HashTableT> - inline TypedValue finalizeGroupInHashTableFast( + template <typename HandleT> + inline TypedValue finalizeGroupInHashTable( const AggregationStateHashTableBase &hash_table, - const std::vector<TypedValue> &group_key, - int index) const { + const std::size_t index, + const std::vector<TypedValue> &group_key) const { const std::uint8_t *group_state = - static_cast<const HashTableT &>(hash_table).getSingleCompositeKey(group_key, index); + static_cast<const PackedPayloadHashTable &>(hash_table) + .getSingleCompositeKey(group_key, index); DCHECK(group_state != nullptr) << "Could not find entry for specified group_key in HashTable"; - return static_cast<const HandleT *>(this)->finalizeHashTableEntryFast( - group_state); + return static_cast<const HandleT *>(this)->finalizeHashTableEntry(group_state); } - template <typename HandleT, typename HashTableT> - void mergeGroupByHashTablesHelperFast( - const AggregationStateHashTableBase &source_hash_table, - AggregationStateHashTableBase *destination_hash_table) const; + bool block_update_; private: DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle); @@ -185,17 +173,10 @@ class HashTableAggregateFinalizer { output_column_vector_(output_column_vector) {} inline void operator()(const std::vector<TypedValue> &group_by_key, - const AggregationState &group_state) { - group_by_keys_->emplace_back(group_by_key); - output_column_vector_->appendTypedValue( - handle_.finalizeHashTableEntry(group_state)); - } - - inline void operator()(const std::vector<TypedValue> &group_by_key, - const unsigned char *byte_ptr) { + const std::uint8_t *byte_ptr) { group_by_keys_->emplace_back(group_by_key); output_column_vector_->appendTypedValue( - handle_.finalizeHashTableEntryFast(byte_ptr)); + handle_.finalizeHashTableEntry(byte_ptr)); } private: @@ -211,7 +192,7 @@ class HashTableAggregateFinalizer { template <typename HandleT, typename StateT> StateT* AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + aggregateOnDistinctifyHashTableForSingleUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table) const { const HandleT &handle = static_cast<const HandleT &>(*this); StateT *state = static_cast<StateT *>(createInitialState()); @@ -219,15 +200,14 @@ StateT* AggregationConcreteHandle:: // A lambda function which will be called on each key from the distinctify // hash table. const auto aggregate_functor = [&handle, &state]( - const TypedValue &key, const std::uint8_t &dumb_placeholder) { + const TypedValue &key, const std::uint8_t *dumb_placeholder) { // For each (unary) key in the distinctify hash table, aggregate the key // into "state". handle.iterateUnaryInl(state, key); }; - const AggregationStateFastHashTable &hash_table = - static_cast<const AggregationStateFastHashTable &>( - distinctify_hash_table); + const auto &hash_table = + static_cast<const PackedPayloadHashTable &>(distinctify_hash_table); // Invoke the lambda function "aggregate_functor" on each key from the // distinctify hash table. hash_table.forEach(&aggregate_functor); @@ -235,20 +215,20 @@ StateT* AggregationConcreteHandle:: return state; } -template <typename HandleT, typename HashTableT> +template <typename HandleT> void AggregationConcreteHandle:: - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + aggregateOnDistinctifyHashTableForGroupByUnaryHelper( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { const HandleT &handle = static_cast<const HandleT &>(*this); - HashTableT *target_hash_table = - static_cast<HashTableT *>(aggregation_hash_table); + PackedPayloadHashTable *target_hash_table = + static_cast<PackedPayloadHashTable *>(aggregation_hash_table); // A lambda function which will be called on each key-value pair from the // distinctify hash table. const auto aggregate_functor = [&handle, &target_hash_table, &index]( - std::vector<TypedValue> &key, const bool &dumb_placeholder) { + std::vector<TypedValue> &key, const std::uint8_t *dumb_placeholder) { // For each (composite) key vector in the distinctify hash table with size N. // The first N-1 entries are GROUP BY columns and the last entry is the // argument to be aggregated on. @@ -258,28 +238,28 @@ void AggregationConcreteHandle:: // An upserter as lambda function for aggregating the argument into its // GROUP BY group's entry inside aggregation_hash_table. const auto upserter = [&handle, &argument](std::uint8_t *state) { - handle.iterateUnaryInlFast(argument, state); + handle.iterateUnaryInl(argument, state); }; - target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index); + target_hash_table->upsertCompositeKey(key, &upserter, index); }; - const HashTableT &source_hash_table = - static_cast<const HashTableT &>(distinctify_hash_table); + const PackedPayloadHashTable &source_hash_table = + static_cast<const PackedPayloadHashTable &>(distinctify_hash_table); // Invoke the lambda function "aggregate_functor" on each composite key vector // from the distinctify hash table. - source_hash_table.forEachCompositeKeyFast(&aggregate_functor); + source_hash_table.forEachCompositeKey(&aggregate_functor); } -template <typename HandleT, typename HashTableT> -ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( +template <typename HandleT> +ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const { const HandleT &handle = static_cast<const HandleT &>(*this); - const HashTableT &hash_table_concrete = - static_cast<const HashTableT &>(hash_table); + const PackedPayloadHashTable &hash_table_concrete = + static_cast<const PackedPayloadHashTable &>(hash_table); if (group_by_keys->empty()) { if (NativeColumnVector::UsableForType(result_type)) { @@ -287,14 +267,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( new NativeColumnVector(result_type, hash_table_concrete.numEntries()); HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer( handle, group_by_keys, result); - hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + hash_table_concrete.forEachCompositeKey(&finalizer, index); return result; } else { IndirectColumnVector *result = new IndirectColumnVector( result_type, hash_table_concrete.numEntries()); HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer( handle, group_by_keys, result); - hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + hash_table_concrete.forEachCompositeKey(&finalizer, index); return result; } } else { @@ -303,8 +283,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( new NativeColumnVector(result_type, group_by_keys->size()); for (const std::vector<TypedValue> &group_by_key : *group_by_keys) { result->appendTypedValue( - finalizeGroupInHashTableFast<HandleT, HashTableT>( - hash_table, group_by_key, index)); + finalizeGroupInHashTable<HandleT>( + hash_table, index, group_by_key)); } return result; } else { @@ -312,8 +292,8 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( result_type, hash_table_concrete.numEntries()); for (const std::vector<TypedValue> &group_by_key : *group_by_keys) { result->appendTypedValue( - finalizeGroupInHashTableFast<HandleT, HashTableT>( - hash_table, group_by_key, index)); + finalizeGroupInHashTable<HandleT>( + hash_table, index, group_by_key)); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index 4b51179..9c7b166 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -21,20 +21,21 @@ #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_HPP_ #include <cstddef> -#include <memory> #include <vector> -#include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregationID.hpp" #include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/TypedValue.hpp" #include "utility/Macros.hpp" +#include "glog/logging.h" + namespace quickstep { class ColumnVector; class StorageManager; class Type; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -109,34 +110,34 @@ class AggregationHandle { virtual ~AggregationHandle() {} /** - * @brief Create an initial "blank" state for this aggregation. + * @brief Get the ID of this aggregation. * - * @return An initial "blank" state for this particular aggregation. + * @return The AggregationID of this AggregationHandle. **/ - virtual AggregationState* createInitialState() const = 0; + inline AggregationID getAggregationID() const { + return agg_id_; + } /** - * @brief Create a new HashTable for aggregation with GROUP BY. + * @brief Get the list of Types (in order) for arguments to this aggregation. * - * @param hash_table_impl The choice of which concrete HashTable - * implementation to use. - * @param group_by_types The types of the GROUP BY columns/expressions. These - * correspond to the (composite) key type for the HashTable. - * @param estimated_num_groups The estimated number of distinct groups for - * the GROUP BY aggregation. This is used to size the initial - * HashTable. This is an estimate only, and the HashTable will be - * resized if it becomes over-full. - * @param storage_manager The StorageManager to use to create the HashTable. - * A StorageBlob will be allocated to serve as the HashTable's - * in-memory storage. - * @return A new HashTable instance with the appropriate state type for this - * aggregate. + * @return The list of Types for arguments to this aggregation. + */ + virtual std::vector<const Type *> getArgumentTypes() const = 0; + + /** + * @brief Get the result Type of this aggregation. + * + * @return The result Type of this aggregation. + */ + virtual const Type* getResultType() const = 0; + + /** + * @brief Create an initial "blank" state for this aggregation. + * + * @return An initial "blank" state for this particular aggregation. **/ - virtual AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const = 0; + virtual AggregationState* createInitialState() const = 0; /** * @brief Accumulate over tuples for a nullary aggregate function (one that @@ -146,70 +147,31 @@ class AggregationHandle { * data is accessed, the only thing that a nullary aggeregate can know * about input is its cardinality. * @return A new AggregationState which contains the accumulated results from - * applying the (nullary) aggregate to the specified number of - * tuples. + * applying the (nullary) aggregate to the specified number of tuples. **/ virtual AggregationState* accumulateNullary( - const std::size_t num_tuples) const = 0; - - /** - * @brief Accumulate (iterate over) all values in one or more ColumnVectors - * and return a new AggregationState which can be merged with other - * states or finalized. - * - * @param column_vectors One or more ColumnVectors that the aggregate will be - * applied to. These correspond to the aggregate function's arguments, - * in order. - * @return A new AggregationState which contains the accumulated results from - * applying the aggregate to column_vectors. Caller is responsible - * for deleting the returned AggregationState. - **/ - virtual AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const = 0; + const std::size_t num_tuples) const { + LOG(FATAL) << "Called accumulateNullary on an AggregationHandle that " + << "takes at least one argument."; + } -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION /** * @brief Accumulate (iterate over) all values in columns accessible through - * a ValueAccessor and return a new AggregationState which can be - * merged with other states or finalized. + * the ValueAccessors from a ValueAccessorMultiplexer and return a new + * AggregationState which can be merged with other states or finalized. * - * @param accessor A ValueAccessor that the columns to be aggregated can be - * accessed through. - * @param accessor_ids The attribute_ids that correspond to the columns in - * accessor to aggeregate. These correspond to the aggregate - * function's arguments, in order. + * @param argument_ids The multi-source attribute ids that correspond to the + * columns in \p accessor_mux to aggeregate. These correspond to the + * aggregate function's arguments, in order. + * @param accessor_mux A ValueAccessorMultiplexer object that contains the + * ValueAccessors. * @return A new AggregationState which contains the accumulated results from - * applying the aggregate to the specified columns in accessor. + * applying the aggregate to the specified columns. * Caller is responsible for deleting the returned AggregationState. **/ virtual AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const = 0; -#endif - - /** - * @brief Perform an aggregation with GROUP BY over all the tuples accessible - * through a ValueAccessor, upserting states in a HashTable. - * - * @note Implementations of this method are threadsafe with respect to - * hash_table, and can be called concurrently from multiple threads - * with the same HashTable object. - * - * @param accessor The ValueAccessor that will be iterated over to read - * tuples. - * @param argument_ids The attribute_ids of the arguments to this aggregate - * in accessor, in order. - * @param group_by_key_ids The attribute_ids of the group-by - * columns/expressions in accessor. - * @param hash_table The HashTable to upsert AggregationStates in. This - * should have been created by calling createGroupByHashTable() on - * this same AggregationHandle. - **/ - virtual void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const = 0; + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const = 0; /** * @brief Merge two AggregationStates, updating one in-place. This computes a @@ -253,24 +215,24 @@ class AggregationHandle { * @param hash_table The HashTable to finalize states from. This should have * have been created by calling createGroupByHashTable() on this same * AggregationHandle. + * @param index The index of the AggregationHandle to be finalized. * @param group_by_keys A pointer to a vector of vectors of GROUP BY keys. If * this is initially empty, it will be filled in with the GROUP BY * keys visited by this method in the same order as the finalized * values returned in the ColumnVector. If this is already filled in, * then this method will visit the GROUP BY keys in the exact order * specified. - * @param index The index of the AggregationHandle to be finalized. * * @return A ColumnVector containing each group's finalized aggregate value. **/ virtual ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const = 0; + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const = 0; /** * @brief Create a new HashTable for the distinctify step for DISTINCT - * aggregation. + * aggregation. * * Distinctify is the first step for DISTINCT aggregation. This step inserts * the GROUP BY expression values and aggregation arguments together as keys @@ -283,8 +245,8 @@ class AggregationHandle { * we simply treat it as a special GROUP BY case that the GROUP BY expression * vector is empty. * - * @param hash_table_impl The choice of which concrete HashTable - * implementation to use. + * @param hash_table_impl The choice of which concrete HashTable implementation + * to use. * @param key_types The types of the GROUP BY expressions together with the * types of the aggregation arguments. * @param estimated_num_distinct_keys The estimated number of distinct keys @@ -307,13 +269,14 @@ class AggregationHandle { /** * @brief Inserts the GROUP BY expressions and aggregation arguments together - * as keys into the distinctify hash table. + * as keys into the distinctify hash table. * - * @param accessor The ValueAccessor that will be iterated over to read - * tuples. - * @param key_ids The attribute_ids of the GROUP BY expressions in accessor - * together with the attribute_ids of the arguments to this aggregate - * in accessor, in order. + * @param argument_ids The argument ids that correspond to the columns in + * \p accessor_mux. + * @param key_ids The group-by key ids that correspond to the columns in + * \p accessor_mux. + * @param accessor_mux A ValueAccessorMultiplexer object that contains the + * ValueAccessors to be iterated over to read tuples. * @param distinctify_hash_table The HashTable to store the GROUP BY * expressions and the aggregation arguments together as hash table * keys and a bool constant \c true as hash table value (So the hash @@ -321,13 +284,14 @@ class AggregationHandle { * by calling createDistinctifyHashTable(); */ virtual void insertValueAccessorIntoDistinctifyHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &key_ids, + const std::vector<MultiSourceAttributeId> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_ids, + const ValueAccessorMultiplexer &accessor_mux, AggregationStateHashTableBase *distinctify_hash_table) const = 0; /** * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from - * the distinctify hash table to actually compute the aggregated results. + * the distinctify hash table to actually compute the aggregated results. * * @param distinctify_hash_table Hash table which stores the distinctified * aggregation arguments as hash table keys. This should have been @@ -346,25 +310,26 @@ class AggregationHandle { * @param distinctify_hash_table Hash table which stores the GROUP BY * expression values and aggregation arguments together as hash table * keys. + * @param index The index of the AggregationHandle to perform aggregation. * @param aggregation_hash_table The HashTable to upsert AggregationStates in. * This should have been created by calling createGroupByHashTable() on * this same AggregationHandle. - * @param index The index of the distinctify hash table for which we perform - * the DISTINCT aggregation. */ virtual void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const = 0; + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const = 0; /** * @brief Get the number of bytes needed to store the aggregation handle's * state. **/ - virtual std::size_t getPayloadSize() const { return 1; } + virtual std::size_t getPayloadSize() const { + return 1u; + } /** - * @brief Update the aggregation state for nullary aggregation function e.g. + * @brief Update the aggregation state for nullary aggregation function, e.g. * COUNT(*). * * @note This function should be overloaded by those aggregation function @@ -372,7 +337,10 @@ class AggregationHandle { * * @param byte_ptr The pointer where the aggregation state is stored. **/ - virtual void updateStateNullary(std::uint8_t *byte_ptr) const {} + virtual void updateStateNullary(std::uint8_t *byte_ptr) const { + LOG(FATAL) << "Called updateStateNullary on an AggregationHandle that " + << "takes at least one argument."; + } /** * @brief Update the aggregation state for unary aggregation function e.g. @@ -383,7 +351,7 @@ class AggregationHandle { * @param byte_ptr The pointer where the aggregation state is stored. **/ virtual void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const {} + std::uint8_t *byte_ptr) const = 0; /** * @brief Merge two aggregation states for this aggregation handle. @@ -394,8 +362,8 @@ class AggregationHandle { * @param src A pointer to the source aggregation state. * @param dst A pointer to the destination aggregation state. **/ - virtual void mergeStatesFast(const std::uint8_t *src, - std::uint8_t *dst) const {} + virtual void mergeStates(const std::uint8_t *src, + std::uint8_t *dst) const = 0; /** * @brief Initialize the payload (in the aggregation hash table) for the given @@ -403,7 +371,7 @@ class AggregationHandle { * * @param byte_ptr The pointer to the aggregation state in the hash table. **/ - virtual void initPayload(std::uint8_t *byte_ptr) const {} + virtual void initPayload(std::uint8_t *byte_ptr) const = 0; /** * @brief Destroy the payload (in the aggregation hash table) for the given @@ -411,22 +379,25 @@ class AggregationHandle { * * @param byte_ptr The pointer to the aggregation state in the hash table. **/ - virtual void destroyPayload(std::uint8_t *byte_ptr) const {} + virtual void destroyPayload(std::uint8_t *byte_ptr) const = 0; /** * @brief Inform the aggregation handle to block (prohibit) updates on the * aggregation state. **/ - virtual void blockUpdate() {} + virtual void blockUpdate() = 0; /** - * @brief Inform the aggregation handle to allow updates on the - * aggregation state. + * @brief Inform the aggregation handle to allow updates on the aggregation + * state. **/ - virtual void allowUpdate() {} + virtual void allowUpdate() = 0; protected: - AggregationHandle() {} + explicit AggregationHandle(const AggregationID agg_id) + : agg_id_(agg_id) {} + + const AggregationID agg_id_; private: DISALLOW_COPY_AND_ASSIGN(AggregationHandle); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index 2481092..46bec1e 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -20,12 +20,13 @@ #include "expressions/aggregation/AggregationHandleAvg.hpp" #include <cstddef> +#include <cstdint> #include <memory> #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypeFactory.hpp" @@ -39,10 +40,11 @@ namespace quickstep { -class StorageManager; +class ColumnVector; AggregationHandleAvg::AggregationHandleAvg(const Type &type) - : argument_type_(type), block_update_(false) { + : AggregationConcreteHandle(AggregationID::kAvg), + argument_type_(type) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -87,52 +89,29 @@ AggregationHandleAvg::AggregationHandleAvg(const Type &type) ->getNullableVersion()); } -AggregationStateHashTableBase* AggregationHandleAvg::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateAvg>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleAvg::accumulateValueAccessor( + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for AVG: " << argument_ids.size(); -AggregationState* AggregationHandleAvg::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for AVG: " << column_vectors.size(); + const ValueAccessorSource argument_source = argument_ids.front().source; + const attribute_id argument_id = argument_ids.front().attr_id; - AggregationStateAvg *state = new AggregationStateAvg(blank_state_); - std::size_t count = 0; - state->sum_ = fast_add_operator_->accumulateColumnVector( - state->sum_, *column_vectors.front(), &count); - state->count_ = count; - return state; -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleAvg::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for AVG: " << accessor_ids.size(); + DCHECK(argument_source != ValueAccessorSource::kInvalid); + DCHECK_NE(argument_id, kInvalidAttributeID); AggregationStateAvg *state = new AggregationStateAvg(blank_state_); std::size_t count = 0; - state->sum_ = fast_add_operator_->accumulateValueAccessor( - state->sum_, accessor, accessor_ids.front(), &count); + state->sum_ = + fast_add_operator_->accumulateValueAccessor( + state->sum_, + accessor_mux.getValueAccessorBySource(argument_source), + argument_id, + &count); state->count_ = count; return state; } -#endif - -void AggregationHandleAvg::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for AVG: " << argument_ids.size(); -} void AggregationHandleAvg::mergeStates(const AggregationState &source, AggregationState *destination) const { @@ -147,8 +126,8 @@ void AggregationHandleAvg::mergeStates(const AggregationState &source, avg_destination->sum_, avg_source.sum_); } -void AggregationHandleAvg::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleAvg::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_); const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>( @@ -177,29 +156,25 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const { ColumnVector* AggregationHandleAvg::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast<AggregationHandleAvg, - AggregationStateFastHashTable>( - *result_type_, hash_table, group_by_keys, index); + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const { + return finalizeHashTableHelper<AggregationHandleAvg>( + *result_type_, hash_table, index, group_by_keys); } -AggregationState* -AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( +AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleAvg, - AggregationStateAvg>(distinctify_hash_table); + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + AggregationHandleAvg, AggregationStateAvg>( + distinctify_hash_table); } void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleAvg, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleAvg>( + distinctify_hash_table, index, aggregation_hash_table); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index 47132c6..970561c 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -25,11 +25,9 @@ #include <memory> #include <vector> -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -40,9 +38,8 @@ namespace quickstep { +class AggregationStateHashTableBase; class ColumnVector; -class StorageManager; -class ValueAccessor; /** \addtogroup Expressions * @{ @@ -106,19 +103,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle { public: ~AggregationHandleAvg() override {} + std::vector<const Type *> getArgumentTypes() const override { + return {&argument_type_}; + } + + const Type* getResultType() const override { + return result_type_; + } + AggregationState* createInitialState() const override { return new AggregationStateAvg(blank_state_); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - /** - * @brief Iterate method with average aggregation state. - **/ inline void iterateUnaryInl(AggregationStateAvg *state, const TypedValue &value) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); @@ -129,8 +125,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++state->count_; } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { + inline void iterateUnaryInl(const TypedValue &value, + std::uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); if (value.isNull()) return; TypedValue *sum_ptr = @@ -141,16 +137,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++(*count_ptr); } - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); - } - } + AggregationState* accumulateValueAccessor( + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const override; - void blockUpdate() override { block_update_ = true; } + void mergeStates(const AggregationState &source, + AggregationState *destination) const override; - void allowUpdate() override { block_update_ = false; } + TypedValue finalize(const AggregationState &state) const override; + + std::size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } void initPayload(std::uint8_t *byte_ptr) const override { TypedValue *sum_ptr = @@ -169,43 +167,17 @@ class AggregationHandleAvg : public AggregationConcreteHandle { } } - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override; - - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInl(argument, byte_ptr); + } + } - TypedValue finalize(const AggregationState &state) const override; + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - const AggregationStateAvg &agg_state = - static_cast<const AggregationStateAvg &>(state); - // TODO(chasseur): Could improve performance further if we made a special - // version of finalizeHashTable() that collects all the sums into one - // ColumnVector and all the counts into another and then applies - // '*divide_operator_' to them in bulk. - return divide_operator_->applyToTypedValues( - agg_state.sum_, TypedValue(static_cast<double>(agg_state.count_))); - } - - inline TypedValue finalizeHashTableEntryFast( const std::uint8_t *byte_ptr) const { std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr); TypedValue *sum_ptr = @@ -218,31 +190,16 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for AVG aggregation. - */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; + const AggregationStateHashTableBase &distinctify_hash_table) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for AVG aggregation. - */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { - return blank_state_.getPayloadSize(); - } + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const override; private: friend class AggregateFunctionAvg; @@ -261,8 +218,6 @@ class AggregationHandleAvg : public AggregationConcreteHandle { std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_; std::unique_ptr<UncheckedBinaryOperator> divide_operator_; - bool block_update_; - DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index 034c942..cf92ec7 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -21,100 +21,48 @@ #include <atomic> #include <cstddef> -#include <memory> +#include <cstdint> #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "storage/ValueAccessorUtil.hpp" -#endif - #include "types/TypeFactory.hpp" #include "types/TypeID.hpp" #include "types/TypedValue.hpp" -#include "types/containers/ColumnVector.hpp" -#include "types/containers/ColumnVectorUtil.hpp" #include "glog/logging.h" namespace quickstep { -class StorageManager; -class Type; -class ValueAccessor; +class ColumnVector; template <bool count_star, bool nullable_type> -AggregationStateHashTableBase* -AggregationHandleCount<count_star, nullable_type>::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory< - AggregationStateCount>::CreateResizable(hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); -} - -template <bool count_star, bool nullable_type> -AggregationState* -AggregationHandleCount<count_star, nullable_type>::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { +AggregationState* AggregationHandleCount<count_star, nullable_type> + ::accumulateValueAccessor( + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const { DCHECK(!count_star) << "Called non-nullary accumulation method on an AggregationHandleCount " << "set up for nullary COUNT(*)"; - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for COUNT: " - << column_vectors.size(); + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for COUNT: " << argument_ids.size(); - std::size_t count = 0; - InvokeOnColumnVector( - *column_vectors.front(), - [&](const auto &column_vector) -> void { // NOLINT(build/c++11) - if (nullable_type) { - // TODO(shoban): Iterating over the ColumnVector is a rather slow way - // to do this. We should look at extending the ColumnVector interface - // to do a quick count of the non-null values (i.e. the length minus - // the population count of the null bitmap). We should do something - // similar for ValueAccessor too. - for (std::size_t pos = 0; pos < column_vector.size(); ++pos) { - count += !column_vector.getTypedValue(pos).isNull(); - } - } else { - count = column_vector.size(); - } - }); + const ValueAccessorSource argument_source = argument_ids.front().source; + const attribute_id argument_id = argument_ids.front().attr_id; - return new AggregationStateCount(count); -} + DCHECK(argument_source != ValueAccessorSource::kInvalid); + DCHECK_NE(argument_id, kInvalidAttributeID); -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -template <bool count_star, bool nullable_type> -AggregationState* -AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK(!count_star) - << "Called non-nullary accumulation method on an AggregationHandleCount " - << "set up for nullary COUNT(*)"; - - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for COUNT: " << accessor_ids.size(); - - const attribute_id accessor_id = accessor_ids.front(); std::size_t count = 0; InvokeOnValueAccessorMaybeTupleIdSequenceAdapter( - accessor, - [&accessor_id, &count](auto *accessor) -> void { // NOLINT(build/c++11) + accessor_mux.getValueAccessorBySource(argument_source), + [&argument_id, &count](auto *accessor) -> void { // NOLINT(build/c++11) if (nullable_type) { while (accessor->next()) { - count += !accessor->getTypedValue(accessor_id).isNull(); + count += !accessor->getTypedValue(argument_id).isNull(); } } else { count = accessor->getNumTuples(); @@ -123,24 +71,6 @@ AggregationHandleCount<count_star, nullable_type>::accumulateValueAccessor( return new AggregationStateCount(count); } -#endif - -template <bool count_star, bool nullable_type> -void AggregationHandleCount<count_star, nullable_type>:: - aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - if (count_star) { - DCHECK_EQ(0u, argument_ids.size()) - << "Got wrong number of arguments for COUNT(*): " - << argument_ids.size(); - } else { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for COUNT: " << argument_ids.size(); - } -} template <bool count_star, bool nullable_type> void AggregationHandleCount<count_star, nullable_type>::mergeStates( @@ -156,7 +86,7 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStates( } template <bool count_star, bool nullable_type> -void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast( +void AggregationHandleCount<count_star, nullable_type>::mergeStates( const std::uint8_t *source, std::uint8_t *destination) const { const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source); @@ -165,38 +95,35 @@ void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast( } template <bool count_star, bool nullable_type> -ColumnVector* -AggregationHandleCount<count_star, nullable_type>::finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast< - AggregationHandleCount<count_star, nullable_type>, - AggregationStateFastHashTable>( - TypeFactory::GetType(kLong), hash_table, group_by_keys, index); +ColumnVector* AggregationHandleCount<count_star, nullable_type> + ::finalizeHashTable( + const AggregationStateHashTableBase &hash_table, + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const { + return finalizeHashTableHelper< + AggregationHandleCount<count_star, nullable_type>>( + TypeFactory::GetType(kLong), hash_table, index, group_by_keys); } template <bool count_star, bool nullable_type> -AggregationState* AggregationHandleCount<count_star, nullable_type>:: - aggregateOnDistinctifyHashTableForSingle( +AggregationState* AggregationHandleCount<count_star, nullable_type> + ::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - DCHECK_EQ(count_star, false); - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< AggregationHandleCount<count_star, nullable_type>, - AggregationStateCount>(distinctify_hash_table); + AggregationStateCount>( + distinctify_hash_table); } template <bool count_star, bool nullable_type> -void AggregationHandleCount<count_star, nullable_type>:: - aggregateOnDistinctifyHashTableForGroupBy( +void AggregationHandleCount<count_star, nullable_type> + ::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - DCHECK_EQ(count_star, false); - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleCount<count_star, nullable_type>, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationHandleCount<count_star, nullable_type>>( + distinctify_hash_table, index, aggregation_hash_table); } // Explicitly instantiate and compile in the different versions of http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index 6aab0cd..72ea923 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -23,23 +23,21 @@ #include <atomic> #include <cstddef> #include <cstdint> -#include <memory> #include <vector> -#include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" -#include "storage/FastHashTable.hpp" -#include "storage/HashTableBase.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "types/LongType.hpp" #include "types/TypedValue.hpp" #include "utility/Macros.hpp" namespace quickstep { +class AggregationStateHashTableBase; class ColumnVector; -class StorageManager; class Type; -class ValueAccessor; template <bool, bool> class AggregationHandleCount; @@ -98,28 +96,31 @@ class AggregationHandleCount : public AggregationConcreteHandle { public: ~AggregationHandleCount() override {} + std::vector<const Type *> getArgumentTypes() const override { + if (argument_type_ == nullptr) { + return {}; + } else { + return {argument_type_}; + } + } + + const Type* getResultType() const override { + return &LongType::InstanceNonNullable(); + } + AggregationState* createInitialState() const override { return new AggregationStateCount(); } - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - inline void iterateNullaryInl(AggregationStateCount *state) const { state->count_.fetch_add(1, std::memory_order_relaxed); } - inline void iterateNullaryInlFast(std::uint8_t *byte_ptr) const { + inline void iterateNullaryInl(std::uint8_t *byte_ptr) const { std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); - (*count_ptr)++; + ++(*count_ptr); } - /** - * @brief Iterate with count aggregation state. - */ inline void iterateUnaryInl(AggregationStateCount *state, const TypedValue &value) const { if ((!nullable_type) || (!value.isNull())) { @@ -127,118 +128,89 @@ class AggregationHandleCount : public AggregationConcreteHandle { } } - inline void iterateUnaryInlFast(const TypedValue &value, - std::uint8_t *byte_ptr) const { + inline void iterateUnaryInl(const TypedValue &value, + std::uint8_t *byte_ptr) const { if ((!nullable_type) || (!value.isNull())) { - std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); - (*count_ptr)++; - } - } - - inline void updateStateUnary(const TypedValue &argument, - std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateUnaryInlFast(argument, byte_ptr); + ++(*reinterpret_cast<std::int64_t *>(byte_ptr)); } } - inline void updateStateNullary(std::uint8_t *byte_ptr) const override { - if (!block_update_) { - iterateNullaryInlFast(byte_ptr); - } - } - - void blockUpdate() override { block_update_ = true; } - - void allowUpdate() override { block_update_ = false; } - - void initPayload(std::uint8_t *byte_ptr) const override { - std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); - *count_ptr = 0; - } - AggregationState* accumulateNullary( const std::size_t num_tuples) const override { return new AggregationStateCount(num_tuples); } - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override; - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_id) const override; -#endif - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const override; void mergeStates(const AggregationState &source, AggregationState *destination) const override; - void mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const override; - TypedValue finalize(const AggregationState &state) const override { return TypedValue( static_cast<const AggregationStateCount &>(state).count_.load( std::memory_order_relaxed)); } - inline TypedValue finalizeHashTableEntry( - const AggregationState &state) const { - return TypedValue( - static_cast<const AggregationStateCount &>(state).count_.load( - std::memory_order_relaxed)); + std::size_t getPayloadSize() const override { + return sizeof(std::int64_t); + } + + void initPayload(std::uint8_t *byte_ptr) const override { + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); + *count_ptr = 0; } - inline TypedValue finalizeHashTableEntryFast( - const std::uint8_t *byte_ptr) const { - const std::int64_t *count_ptr = - reinterpret_cast<const std::int64_t *>(byte_ptr); - return TypedValue(*count_ptr); + void destroyPayload(std::uint8_t *byte_ptr) const override {} + + inline void updateStateNullary(std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateNullaryInl(byte_ptr); + } + } + + inline void updateStateUnary(const TypedValue &argument, + std::uint8_t *byte_ptr) const override { + if (!block_update_) { + iterateUnaryInl(argument, byte_ptr); + } + } + + void mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const override; + + inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const { + return TypedValue(*reinterpret_cast<const std::int64_t *>(byte_ptr)); } ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForSingle() - * for SUM aggregation. - */ AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override; + const AggregationStateHashTableBase &distinctify_hash_table) const override; - /** - * @brief Implementation of - * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy() - * for SUM aggregation. - */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const override; - - std::size_t getPayloadSize() const override { return sizeof(std::int64_t); } + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const override; private: friend class AggregateFunctionCount; /** - * @brief Constructor. + * @brief Initialize handle for count. + * + * @param argument_type Type of the value to be counted. The parameter should + * be nullptr for nullary aggregation (i.e. COUNT(*)). **/ - AggregationHandleCount() : block_update_(false) {} + explicit AggregationHandleCount(const Type *argument_type) + : AggregationConcreteHandle(AggregationID::kCount), + argument_type_(argument_type) {} - bool block_update_; + const Type *argument_type_; DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleDistinct.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp deleted file mode 100644 index 0dc8b56..0000000 --- a/expressions/aggregation/AggregationHandleDistinct.cpp +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "expressions/aggregation/AggregationHandleDistinct.hpp" - -#include <cstddef> -#include <memory> -#include <vector> -#include <utility> - -#include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" - -#include "types/TypedValue.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -class ColumnVector; -class StorageManager; -class Type; -class ValueAccessor; - -AggregationStateHashTableBase* AggregationHandleDistinct::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type*> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return createDistinctifyHashTable( - hash_table_impl, - group_by_types, - estimated_num_groups, - storage_manager); -} - -void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(argument_ids.size(), 0u); - - insertValueAccessorIntoDistinctifyHashTable( - accessor, - group_by_key_ids, - hash_table); -} - -ColumnVector* AggregationHandleDistinct::finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - DCHECK(group_by_keys->empty()); - - const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key, - const bool &dumb_placeholder) -> void { - group_by_keys->emplace_back(std::move(group_by_key)); - }; - static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKeyFast(&keys_retriever); - - return nullptr; -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleDistinct.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp deleted file mode 100644 index 838bfdd..0000000 --- a/expressions/aggregation/AggregationHandleDistinct.hpp +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_ -#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_ - -#include <cstddef> -#include <memory> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationConcreteHandle.hpp" -#include "storage/HashTableBase.hpp" -#include "types/TypedValue.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -class AggregationState; -class ColumnVector; -class StorageManager; -class Type; -class ValueAccessor; - -/** \addtogroup Expressions - * @{ - */ - -class AggregationHandleDistinct : public AggregationConcreteHandle { - public: - /** - * @brief Constructor. - **/ - AggregationHandleDistinct() {} - - AggregationState* createInitialState() const override { - LOG(FATAL) - << "AggregationHandleDistinct does not support createInitialState()."; - } - - AggregationState* accumulateNullary( - const std::size_t num_tuples) const override { - LOG(FATAL) - << "AggregationHandleDistinct does not support accumulateNullary()."; - } - - AggregationState* accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) - const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - "accumulateColumnVectors()."; - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - AggregationState* accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - "accumulateValueAccessor()."; - } -#endif - - void mergeStates(const AggregationState &source, - AggregationState *destination) const override { - LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates()."; - } - - TypedValue finalize(const AggregationState &state) const override { - LOG(FATAL) << "AggregationHandleDistinct does not support finalize()."; - } - - AggregationState* aggregateOnDistinctifyHashTableForSingle( - const AggregationStateHashTableBase &distinctify_hash_table) - const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - << "aggregateOnDistinctifyHashTableForSingle()."; - } - - void aggregateOnDistinctifyHashTableForGroupBy( - const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *groupby_hash_table, - std::size_t index) const override { - LOG(FATAL) << "AggregationHandleDistinct does not support " - << "aggregateOnDistinctifyHashTableForGroupBy()."; - } - - AggregationStateHashTableBase* createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const override; - - void aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const override; - - ColumnVector* finalizeHashTable( - const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const override; - - private: - DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2d89e4fb/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index c2d571b..fe1773f 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -19,15 +19,16 @@ #include "expressions/aggregation/AggregationHandleMax.hpp" +#include <cstddef> +#include <cstdint> #include <memory> #include <vector> #include "catalog/CatalogTypedefs.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" -#include "types/containers/ColumnVector.hpp" #include "types/operations/comparisons/Comparison.hpp" #include "types/operations/comparisons/ComparisonFactory.hpp" #include "types/operations/comparisons/ComparisonID.hpp" @@ -36,54 +37,32 @@ namespace quickstep { -class StorageManager; +class ColumnVector; AggregationHandleMax::AggregationHandleMax(const Type &type) - : type_(type), block_update_(false) { + : AggregationConcreteHandle(AggregationID::kMax), + type_(type) { fast_comparator_.reset( ComparisonFactory::GetComparison(ComparisonID::kGreater) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); } -AggregationStateHashTableBase* AggregationHandleMax::createGroupByHashTable( - const HashTableImplType hash_table_impl, - const std::vector<const Type *> &group_by_types, - const std::size_t estimated_num_groups, - StorageManager *storage_manager) const { - return AggregationStateHashTableFactory<AggregationStateMax>::CreateResizable( - hash_table_impl, group_by_types, estimated_num_groups, storage_manager); -} +AggregationState* AggregationHandleMax::accumulateValueAccessor( + const std::vector<MultiSourceAttributeId> &argument_ids, + const ValueAccessorMultiplexer &accessor_mux) const { + DCHECK_EQ(1u, argument_ids.size()) + << "Got wrong number of attributes for MAX: " << argument_ids.size(); -AggregationState* AggregationHandleMax::accumulateColumnVectors( - const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { - DCHECK_EQ(1u, column_vectors.size()) - << "Got wrong number of ColumnVectors for MAX: " << column_vectors.size(); + const ValueAccessorSource argument_source = argument_ids.front().source; + const attribute_id argument_id = argument_ids.front().attr_id; - return new AggregationStateMax(fast_comparator_->accumulateColumnVector( - type_.getNullableVersion().makeNullValue(), *column_vectors.front())); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION -AggregationState* AggregationHandleMax::accumulateValueAccessor( - ValueAccessor *accessor, - const std::vector<attribute_id> &accessor_ids) const { - DCHECK_EQ(1u, accessor_ids.size()) - << "Got wrong number of attributes for MAX: " << accessor_ids.size(); + DCHECK(argument_source != ValueAccessorSource::kInvalid); + DCHECK_NE(argument_id, kInvalidAttributeID); return new AggregationStateMax(fast_comparator_->accumulateValueAccessor( type_.getNullableVersion().makeNullValue(), - accessor, - accessor_ids.front())); -} -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - -void AggregationHandleMax::aggregateValueAccessorIntoHashTable( - ValueAccessor *accessor, - const std::vector<attribute_id> &argument_ids, - const std::vector<attribute_id> &group_by_key_ids, - AggregationStateHashTableBase *hash_table) const { - DCHECK_EQ(1u, argument_ids.size()) - << "Got wrong number of arguments for MAX: " << argument_ids.size(); + accessor_mux.getValueAccessorBySource(argument_source), + argument_id)); } void AggregationHandleMax::mergeStates(const AggregationState &source, @@ -98,40 +77,36 @@ void AggregationHandleMax::mergeStates(const AggregationState &source, } } -void AggregationHandleMax::mergeStatesFast(const std::uint8_t *source, - std::uint8_t *destination) const { +void AggregationHandleMax::mergeStates(const std::uint8_t *source, + std::uint8_t *destination) const { const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source); TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination); if (!(src_max_ptr->isNull())) { - compareAndUpdateFast(dst_max_ptr, *src_max_ptr); + compareAndUpdate(dst_max_ptr, *src_max_ptr); } } ColumnVector* AggregationHandleMax::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys, - int index) const { - return finalizeHashTableHelperFast<AggregationHandleMax, - AggregationStateFastHashTable>( - type_.getNullableVersion(), hash_table, group_by_keys, index); + const std::size_t index, + std::vector<std::vector<TypedValue>> *group_by_keys) const { + return finalizeHashTableHelper<AggregationHandleMax>( + type_, hash_table, index, group_by_keys); } -AggregationState* -AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( +AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< - AggregationHandleMax, - AggregationStateMax>(distinctify_hash_table); + return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + AggregationHandleMax, AggregationStateMax>( + distinctify_hash_table); } void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table, - std::size_t index) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< - AggregationHandleMax, - AggregationStateFastHashTable>( - distinctify_hash_table, aggregation_hash_table, index); + const std::size_t index, + AggregationStateHashTableBase *aggregation_hash_table) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelper<AggregationHandleMax>( + distinctify_hash_table, index, aggregation_hash_table); } } // namespace quickstep