Initial commit for QUICKSTEP-28 and QUICKSTEP-29. Code refactoring and cleanup, some more optimizations are pending.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0756e7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0756e7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0756e7e Branch: refs/heads/partitioned-aggregation Commit: d0756e7e0c31f65ee8de3cadaaf2d3f037d913b5 Parents: 43c7a42 Author: rathijit <rathi...@node-2.hashtable.quickstep-pg0.wisc.cloudlab.us> Authored: Mon Jul 4 02:44:48 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Tue Sep 20 12:56:06 2016 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 29 +- .../aggregation/AggregationConcreteHandle.hpp | 223 ++ expressions/aggregation/AggregationHandle.hpp | 8 +- .../aggregation/AggregationHandleAvg.cpp | 40 +- .../aggregation/AggregationHandleAvg.hpp | 62 +- .../aggregation/AggregationHandleCount.cpp | 38 +- .../aggregation/AggregationHandleCount.hpp | 50 +- .../aggregation/AggregationHandleDistinct.cpp | 2 +- .../aggregation/AggregationHandleDistinct.hpp | 2 +- .../aggregation/AggregationHandleMax.cpp | 29 +- .../aggregation/AggregationHandleMax.hpp | 39 +- .../aggregation/AggregationHandleMin.cpp | 30 +- .../aggregation/AggregationHandleMin.hpp | 44 +- .../aggregation/AggregationHandleSum.cpp | 31 +- .../aggregation/AggregationHandleSum.hpp | 52 +- expressions/aggregation/CMakeLists.txt | 7 + storage/AggregationOperationState.cpp | 95 +- storage/AggregationOperationState.hpp | 7 +- storage/CMakeLists.txt | 58 + storage/FastHashTable.hpp | 2640 ++++++++++++++++++ storage/FastHashTableFactory.hpp | 300 ++ storage/FastSeparateChainingHashTable.hpp | 1761 ++++++++++++ storage/HashTableBase.hpp | 2 +- storage/HashTablePool.hpp | 42 + storage/StorageBlock.cpp | 88 +- storage/StorageBlock.hpp | 8 + threading/SpinMutex.hpp | 2 + 27 files changed, 5587 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index 719920f..1efe010 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -24,6 +24,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "storage/HashTable.hpp" +#include "storage/FastHashTable.hpp" #include "storage/HashTableFactory.hpp" namespace quickstep { @@ -51,22 +52,24 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( AggregationStateHashTableBase *distinctify_hash_table) const { // If the key-value pair is already there, we don't need to update the value, // which should always be "true". I.e. the value is just a placeholder. - const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {}; +// const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {}; - AggregationStateHashTable<bool> *hash_table = - static_cast<AggregationStateHashTable<bool>*>(distinctify_hash_table); + AggregationStateFastHashTable *hash_table = + static_cast<AggregationStateFastHashTable *>(distinctify_hash_table); if (key_ids.size() == 1) { - hash_table->upsertValueAccessor(accessor, - key_ids[0], - true /* check_for_null_keys */, - true /* initial_value */, - &noop_upserter); +// TODO(rathijit): fix +// hash_table->upsertValueAccessor(accessor, +// key_ids[0], +// true /* check_for_null_keys */, +// true /* initial_value */, +// &noop_upserter); } else { - hash_table->upsertValueAccessorCompositeKey(accessor, - key_ids, - true /* check_for_null_keys */, - true /* initial_value */, - &noop_upserter); + std::vector<std::vector<attribute_id>> empty_args; + empty_args.resize(1); + hash_table->upsertValueAccessorCompositeKeyFast(empty_args, + accessor, + key_ids, + true /* check_for_null_keys */); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index c5ca061..d332ec9 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -31,6 +31,7 @@ #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" #include "utility/Macros.hpp" +#include "threading/SpinMutex.hpp" #include "glog/logging.h" @@ -79,6 +80,37 @@ class HashTableStateUpserter { DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserter); }; +template <typename HandleT> +class HashTableStateUpserterFast { + public: + /** + * @brief Constructor. + * + * @param handle The aggregation handle being used. + * @param source_state The aggregation state in the source aggregation hash + * table. The corresponding state (for the same key) in the destination + * hash table will be upserted. + **/ + HashTableStateUpserterFast(const HandleT &handle, const uint8_t *source_state) + : handle_(handle), source_state_(source_state) {} + + /** + * @brief The operator for the functor required for the upsert. + * + * @param destination_state The aggregation state in the aggregation hash + * table that is being upserted. + **/ + void operator()(uint8_t *destination_state) { + handle_.mergeStatesFast(source_state_, destination_state); + } + + private: + const HandleT &handle_; + const uint8_t *source_state_; + + DISALLOW_COPY_AND_ASSIGN(HashTableStateUpserterFast); +}; + /** * @brief A class to support the functor for merging group by hash tables. **/ @@ -129,6 +161,53 @@ class HashTableMerger { DISALLOW_COPY_AND_ASSIGN(HashTableMerger); }; +template <typename HandleT, typename HashTableT> +class HashTableMergerFast { + public: + /** + * @brief Constructor + * + * @param handle The Aggregation handle being used. + * @param destination_hash_table The destination hash table to which other + * hash tables will be merged. + **/ + HashTableMergerFast(const HandleT &handle, + AggregationStateHashTableBase *destination_hash_table) + : handle_(handle), + destination_hash_table_( + static_cast<HashTableT *>(destination_hash_table)) {} + + /** + * @brief The operator for the functor. + * + * @param group_by_key The group by key being merged. + * @param source_state The aggregation state for the given key in the source + * aggregation hash table. + **/ + inline void operator()(const std::vector<TypedValue> &group_by_key, + const uint8_t *source_state) { + const uint8_t *original_state = + destination_hash_table_->getSingleCompositeKey(group_by_key); + if (original_state != nullptr) { + HashTableStateUpserterFast<HandleT> upserter( + handle_, source_state); + // The CHECK is required as upsertCompositeKey can return false if the + // hash table runs out of space during the upsert process. The ideal + // solution will be to retry again if the upsert fails. + CHECK(destination_hash_table_->upsertCompositeKeyFast( + group_by_key, original_state, &upserter)); + } else { + destination_hash_table_->putCompositeKeyFast(group_by_key, source_state); + } + } + + private: + const HandleT &handle_; + HashTableT *destination_hash_table_; + + DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast); +}; + /** * @brief The helper intermediate subclass of AggregationHandle that provides * virtual method implementations as well as helper methods that are @@ -208,11 +287,26 @@ class AggregationConcreteHandle : public AggregationHandle { template <typename HandleT, typename HashTableT> + void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + const AggregationStateHashTableBase &distinctify_hash_table, + AggregationStateHashTableBase *hash_table) const; + + + template <typename HandleT, + typename HashTableT> ColumnVector* finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, std::vector<std::vector<TypedValue>> *group_by_keys) const; + template <typename HandleT, + typename HashTableT> + ColumnVector* finalizeHashTableHelperFast( + const Type &result_type, + const AggregationStateHashTableBase &hash_table, + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const; + template <typename HandleT, typename HashTableT> inline TypedValue finalizeGroupInHashTable( const AggregationStateHashTableBase &hash_table, @@ -224,11 +318,29 @@ class AggregationConcreteHandle : public AggregationHandle { return static_cast<const HandleT*>(this)->finalizeHashTableEntry(*group_state); } + template <typename HandleT, typename HashTableT> + inline TypedValue finalizeGroupInHashTableFast( + const AggregationStateHashTableBase &hash_table, + const std::vector<TypedValue> &group_key, + int index) const { + const std::uint8_t *group_state + = static_cast<const HashTableT&>(hash_table).getSingleCompositeKey(group_key, index); + DCHECK(group_state != nullptr) + << "Could not find entry for specified group_key in HashTable"; + return static_cast<const HandleT*>(this)->finalizeHashTableEntryFast(group_state); + } + template <typename HandleT, typename StateT, typename HashTableT> void mergeGroupByHashTablesHelper( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const; + template <typename HandleT, typename HashTableT> + void mergeGroupByHashTablesHelperFast( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const; + + private: DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle); }; @@ -302,6 +414,12 @@ class HashTableAggregateFinalizer { output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntry(group_state)); } + inline void operator()(const std::vector<TypedValue> &group_by_key, + const unsigned char *byte_ptr) { + group_by_keys_->emplace_back(group_by_key); + output_column_vector_->appendTypedValue(handle_.finalizeHashTableEntryFast(byte_ptr)); + } + private: const HandleT &handle_; std::vector<std::vector<TypedValue>> *group_by_keys_; @@ -414,6 +532,42 @@ void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe template <typename HandleT, typename HashTableT> +void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( + const AggregationStateHashTableBase &distinctify_hash_table, + AggregationStateHashTableBase *aggregation_hash_table) const { + const HandleT& handle = static_cast<const HandleT&>(*this); + HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table); + + // A lambda function which will be called on each key-value pair from the + // distinctify hash table. + const auto aggregate_functor = [&handle, &target_hash_table]( + std::vector<TypedValue> &key, + const bool &dumb_placeholder) { + // For each (composite) key vector in the distinctify hash table with size N. + // The first N-1 entries are GROUP BY columns and the last entry is the argument + // to be aggregated on. + const TypedValue argument(std::move(key.back())); + key.pop_back(); + + // An upserter as lambda function for aggregating the argument into its + // GROUP BY group's entry inside aggregation_hash_table. + const auto upserter = [&handle, &argument](std::uint8_t *state) { + handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex)); + }; + + target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter); + }; + + const HashTableT &source_hash_table = + static_cast<const HashTableT&>(distinctify_hash_table); + // Invoke the lambda function "aggregate_functor" on each composite key vector + // from the distinctify hash table. + source_hash_table.forEachCompositeKeyFast(&aggregate_functor); +} + + +template <typename HandleT, + typename HashTableT> ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( const Type &result_type, const AggregationStateHashTableBase &hash_table, @@ -463,6 +617,59 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper( } template <typename HandleT, + typename HashTableT> +ColumnVector* AggregationConcreteHandle::finalizeHashTableHelperFast( + const Type &result_type, + const AggregationStateHashTableBase &hash_table, + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + const HandleT &handle = static_cast<const HandleT&>(*this); + const HashTableT &hash_table_concrete = static_cast<const HashTableT&>(hash_table); + + if (group_by_keys->empty()) { + if (NativeColumnVector::UsableForType(result_type)) { + NativeColumnVector *result = new NativeColumnVector(result_type, + hash_table_concrete.numEntries()); + HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer( + handle, + group_by_keys, + result); + hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + return result; + } else { + IndirectColumnVector *result = new IndirectColumnVector(result_type, + hash_table_concrete.numEntries()); + HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer( + handle, + group_by_keys, + result); + hash_table_concrete.forEachCompositeKeyFast(&finalizer, index); + return result; + } + } else { + if (NativeColumnVector::UsableForType(result_type)) { + NativeColumnVector *result = new NativeColumnVector(result_type, + group_by_keys->size()); + for (const std::vector<TypedValue> &group_by_key : *group_by_keys) { + result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table, + group_by_key, + index)); + } + return result; + } else { + IndirectColumnVector *result = new IndirectColumnVector(result_type, + hash_table_concrete.numEntries()); + for (const std::vector<TypedValue> &group_by_key : *group_by_keys) { + result->appendTypedValue(finalizeGroupInHashTableFast<HandleT, HashTableT>(hash_table, + group_by_key, + index)); + } + return result; + } + } +} + +template <typename HandleT, typename StateT, typename HashTableT> void AggregationConcreteHandle::mergeGroupByHashTablesHelper( @@ -478,6 +685,22 @@ void AggregationConcreteHandle::mergeGroupByHashTablesHelper( source_hash_table_concrete.forEachCompositeKey(&merger); } +template <typename HandleT, + typename HashTableT> +void AggregationConcreteHandle::mergeGroupByHashTablesHelperFast( + const AggregationStateHashTableBase &source_hash_table, + AggregationStateHashTableBase *destination_hash_table) const { + const HandleT &handle = static_cast<const HandleT &>(*this); + const HashTableT &source_hash_table_concrete = + static_cast<const HashTableT &>(source_hash_table); + + HashTableMergerFast<HandleT, HashTableT> merger(handle, + destination_hash_table); + + source_hash_table_concrete.forEachCompositeKeyFast(&merger); +} + + } // namespace quickstep #endif // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index 3d6e872..92cd6a7 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -265,7 +265,7 @@ class AggregationHandle { **/ virtual ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const = 0; + std::vector<std::vector<TypedValue>> *group_by_keys, int index) const = 0; /** * @brief Create a new HashTable for the distinctify step for DISTINCT aggregation. @@ -362,6 +362,12 @@ class AggregationHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const = 0; + virtual size_t getPayloadSize() const {return 8;} + virtual void setPayloadOffset(std::size_t) {} + virtual void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) {} + virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {} + virtual void initPayload(uint8_t *byte_ptr) {} + protected: AggregationHandle() { } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index 4bd43d6..f38c628 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -137,8 +137,7 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable( AggregationStateHashTableBase *hash_table) const { DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for AVG: " << argument_ids.size(); - - aggregateValueAccessorIntoHashTableUnaryHelper< +/* aggregateValueAccessorIntoHashTableUnaryHelper< AggregationHandleAvg, AggregationStateAvg, AggregationStateHashTable<AggregationStateAvg>>( @@ -146,7 +145,14 @@ void AggregationHandleAvg::aggregateValueAccessorIntoHashTable( argument_ids.front(), group_by_key_ids, blank_state_, - hash_table); + hash_table); */ + +/* static_cast<AggregationStateFastHashTable *>(hash_table)->upsertValueAccessorCompositeKeyFast( + argument_ids.front(), + accessor, + group_by_key_ids, + true, + const_cast<AggregationHandleAvg *>(this));*/ } void AggregationHandleAvg::mergeStates( @@ -161,6 +167,19 @@ void AggregationHandleAvg::mergeStates( avg_source.sum_); } +void AggregationHandleAvg::mergeStatesFast( + const uint8_t *source, + uint8_t *destination) const { + const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset); + const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source + blank_state_.count_offset); + TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset); + std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination + blank_state_.count_offset); + (*dst_count_ptr) += (*src_count_ptr); + *dst_sum_ptr = merge_add_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr); +} + + + TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const { const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state); if (agg_state.count_ == 0) { @@ -175,12 +194,14 @@ TypedValue AggregationHandleAvg::finalize(const AggregationState &state) const { ColumnVector* AggregationHandleAvg::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleAvg, - AggregationStateHashTable<AggregationStateAvg>>( + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleAvg, + AggregationStateFastHashTable>( *result_type_, hash_table, - group_by_keys); + group_by_keys, + index); } AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( @@ -206,9 +227,8 @@ void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( void AggregationHandleAvg::mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleAvg, - AggregationStateAvg, - AggregationStateHashTable<AggregationStateAvg>>( + mergeGroupByHashTablesHelperFast<AggregationHandleAvg, + AggregationStateFastHashTable>( source_hash_table, destination_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index 31997b1..e187d22 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -29,6 +29,7 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -57,7 +58,10 @@ class AggregationStateAvg : public AggregationState { */ AggregationStateAvg(const AggregationStateAvg &orig) : sum_(orig.sum_), - count_(orig.count_) { + count_(orig.count_), + sum_offset(orig.sum_offset), + count_offset(orig.count_offset), + mutex_offset(orig.mutex_offset) { } /** @@ -65,11 +69,19 @@ class AggregationStateAvg : public AggregationState { */ ~AggregationStateAvg() override {} + size_t getPayloadSize() const { + size_t p1 = reinterpret_cast<size_t>(&sum_); + size_t p2 = reinterpret_cast<size_t>(&mutex_); + return (p2-p1); + } + private: friend class AggregationHandleAvg; AggregationStateAvg() - : sum_(0), count_(0) { + : sum_(0), count_(0), sum_offset(0), + count_offset(reinterpret_cast<uint8_t *>(&count_)-reinterpret_cast<uint8_t *>(&sum_)), + mutex_offset(reinterpret_cast<uint8_t *>(&mutex_)-reinterpret_cast<uint8_t *>(&sum_)) { } // TODO(shoban): We might want to specialize sum_ and count_ to use atomics @@ -77,6 +89,8 @@ class AggregationStateAvg : public AggregationState { TypedValue sum_; std::int64_t count_; SpinMutex mutex_; + + int sum_offset, count_offset, mutex_offset; }; /** @@ -109,6 +123,26 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++state->count_; } + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); + if (value.isNull()) return; + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset); + *sum_ptr = fast_add_operator_->applyToTypedValues(*sum_ptr, value); + ++(*count_ptr); + } + + inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + iterateUnaryInlFast(arguments.front(), byte_ptr); + } + + void initPayload(uint8_t *byte_ptr) override { + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset); + *sum_ptr = blank_state_.sum_; + *count_ptr = blank_state_.count_; + } + AggregationState* accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; @@ -127,6 +161,9 @@ class AggregationHandleAvg : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const uint8_t *source, + uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override; inline TypedValue finalizeHashTableEntry(const AggregationState &state) const { @@ -139,9 +176,24 @@ class AggregationHandleAvg : public AggregationConcreteHandle { TypedValue(static_cast<double>(agg_state.count_))); } + inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const { +// const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state); + // TODO(chasseur): Could improve performance further if we made a special + // version of finalizeHashTable() that collects all the sums into one + // ColumnVector and all the counts into another and then applies + // '*divide_operator_' to them in bulk. + + uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr); + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset); + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(value_ptr + blank_state_.count_offset); + return divide_operator_->applyToTypedValues(*sum_ptr, + TypedValue(static_cast<double>(*count_ptr))); + } + ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() @@ -162,6 +214,10 @@ class AggregationHandleAvg : public AggregationConcreteHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const override; + size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } + private: friend class AggregateFunctionAvg; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index dfcf131..f1eadf1 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -135,18 +135,18 @@ template <bool count_star, bool nullable_type> if (count_star) { DCHECK_EQ(0u, argument_ids.size()) << "Got wrong number of arguments for COUNT(*): " << argument_ids.size(); - aggregateValueAccessorIntoHashTableNullaryHelper< +/* aggregateValueAccessorIntoHashTableNullaryHelper< AggregationHandleCount<count_star, nullable_type>, AggregationStateCount, AggregationStateHashTable<AggregationStateCount>>( accessor, group_by_key_ids, AggregationStateCount(), - hash_table); + hash_table);*/ } else { DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for COUNT: " << argument_ids.size(); - aggregateValueAccessorIntoHashTableUnaryHelper< +/* aggregateValueAccessorIntoHashTableUnaryHelper< AggregationHandleCount<count_star, nullable_type>, AggregationStateCount, AggregationStateHashTable<AggregationStateCount>>( @@ -154,7 +154,7 @@ template <bool count_star, bool nullable_type> argument_ids.front(), group_by_key_ids, AggregationStateCount(), - hash_table); + hash_table); */ } } @@ -170,14 +170,25 @@ template <bool count_star, bool nullable_type> } template <bool count_star, bool nullable_type> +void AggregationHandleCount<count_star, nullable_type>::mergeStatesFast( + const uint8_t *source, + uint8_t *destination) const { + const std::int64_t *src_count_ptr = reinterpret_cast<const std::int64_t *>(source); + std::int64_t *dst_count_ptr = reinterpret_cast<std::int64_t *>(destination); + (*dst_count_ptr) += (*src_count_ptr); +} + +template <bool count_star, bool nullable_type> ColumnVector* AggregationHandleCount<count_star, nullable_type>::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleCount<count_star, nullable_type>, - AggregationStateHashTable<AggregationStateCount>>( + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleCount<count_star, nullable_type>, + AggregationStateFastHashTable>( TypeFactory::GetType(kLong), hash_table, - group_by_keys); + group_by_keys, + index); } template <bool count_star, bool nullable_type> @@ -197,12 +208,10 @@ void AggregationHandleCount<count_star, nullable_type> const AggregationStateHashTableBase &distinctify_hash_table, AggregationStateHashTableBase *aggregation_hash_table) const { DCHECK_EQ(count_star, false); - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleCount<count_star, nullable_type>, - AggregationStateCount, - AggregationStateHashTable<AggregationStateCount>>( + AggregationStateFastHashTable>( distinctify_hash_table, - AggregationStateCount(), aggregation_hash_table); } @@ -210,10 +219,9 @@ template <bool count_star, bool nullable_type> void AggregationHandleCount<count_star, nullable_type>::mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper< + mergeGroupByHashTablesHelperFast< AggregationHandleCount, - AggregationStateCount, - AggregationStateHashTable<AggregationStateCount>>(source_hash_table, + AggregationStateFastHashTable>(source_hash_table, destination_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index 1cd5bda..ed21c41 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -30,6 +30,7 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" #include "types/TypedValue.hpp" #include "utility/Macros.hpp" @@ -62,6 +63,10 @@ class AggregationStateCount : public AggregationState { */ ~AggregationStateCount() override {} + size_t getPayloadSize() const { + return sizeof(count_); + } + private: friend class AggregationHandleCount<false, false>; friend class AggregationHandleCount<false, true>; @@ -108,6 +113,11 @@ class AggregationHandleCount : public AggregationConcreteHandle { state->count_.fetch_add(1, std::memory_order_relaxed); } + inline void iterateNullaryInlFast(uint8_t *byte_ptr) { + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); + (*count_ptr)++; + } + /** * @brief Iterate with count aggregation state. */ @@ -117,6 +127,25 @@ class AggregationHandleCount : public AggregationConcreteHandle { } } + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const { + if ((!nullable_type) || (!value.isNull())) { + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); + (*count_ptr)++; + } + } + + inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (arguments.size()) + iterateUnaryInlFast(arguments.front(), byte_ptr); + else + iterateNullaryInlFast(byte_ptr); + } + + void initPayload(uint8_t *byte_ptr) override { + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); + *count_ptr = 0; + } + AggregationState* accumulateNullary(const std::size_t num_tuples) const override { return new AggregationStateCount(num_tuples); } @@ -139,6 +168,9 @@ class AggregationHandleCount : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const uint8_t *source, + uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override { return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed)); } @@ -147,9 +179,21 @@ class AggregationHandleCount : public AggregationConcreteHandle { return TypedValue(static_cast<const AggregationStateCount&>(state).count_.load(std::memory_order_relaxed)); } + inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const { +// const AggregationStateAvg &agg_state = static_cast<const AggregationStateAvg&>(state); + // TODO(chasseur): Could improve performance further if we made a special + // version of finalizeHashTable() that collects all the sums into one + // ColumnVector and all the counts into another and then applies + // '*divide_operator_' to them in bulk. + + const std::int64_t *count_ptr = reinterpret_cast<const std::int64_t *>(byte_ptr); + return TypedValue(*count_ptr); + } + ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() @@ -170,6 +214,10 @@ class AggregationHandleCount : public AggregationConcreteHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const override; + size_t getPayloadSize() const override { + return sizeof(std::int64_t); + } + private: friend class AggregateFunctionCount; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleDistinct.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp index 68fcd4c..2b9391a 100644 --- a/expressions/aggregation/AggregationHandleDistinct.cpp +++ b/expressions/aggregation/AggregationHandleDistinct.cpp @@ -65,7 +65,7 @@ void AggregationHandleDistinct::aggregateValueAccessorIntoHashTable( ColumnVector* AggregationHandleDistinct::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { + std::vector<std::vector<TypedValue>> *group_by_keys, int index) const { DCHECK(group_by_keys->empty()); const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleDistinct.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp index 8524fcc..0a3acb3 100644 --- a/expressions/aggregation/AggregationHandleDistinct.hpp +++ b/expressions/aggregation/AggregationHandleDistinct.hpp @@ -109,7 +109,7 @@ class AggregationHandleDistinct : public AggregationConcreteHandle { ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index 435f5f2..2080a03 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -89,7 +89,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable( DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for MAX: " << argument_ids.size(); - aggregateValueAccessorIntoHashTableUnaryHelper< +/* aggregateValueAccessorIntoHashTableUnaryHelper< AggregationHandleMax, AggregationStateMax, AggregationStateHashTable<AggregationStateMax>>( @@ -97,7 +97,7 @@ void AggregationHandleMax::aggregateValueAccessorIntoHashTable( argument_ids.front(), group_by_key_ids, AggregationStateMax(type_), - hash_table); + hash_table);*/ } void AggregationHandleMax::mergeStates( @@ -111,14 +111,26 @@ void AggregationHandleMax::mergeStates( } } +void AggregationHandleMax::mergeStatesFast( + const std::uint8_t *source, + std::uint8_t *destination) const { + const TypedValue *src_max_ptr = reinterpret_cast<const TypedValue *>(source); + TypedValue *dst_max_ptr = reinterpret_cast<TypedValue *>(destination); + if (!(src_max_ptr->isNull())) { + compareAndUpdateFast(dst_max_ptr, *src_max_ptr); + } +} + ColumnVector* AggregationHandleMax::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleMax, - AggregationStateHashTable<AggregationStateMax>>( + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleMax, + AggregationStateFastHashTable>( type_.getNullableVersion(), hash_table, - group_by_keys); + group_by_keys, + index); } AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( @@ -144,9 +156,8 @@ void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( void AggregationHandleMax::mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleMax, - AggregationStateMax, - AggregationStateHashTable<AggregationStateMax>>( + mergeGroupByHashTablesHelperFast<AggregationHandleMax, + AggregationStateFastHashTable>( source_hash_table, destination_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index 7e38473..3c06fc4 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -29,6 +29,7 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -105,6 +106,22 @@ class AggregationHandleMax : public AggregationConcreteHandle { compareAndUpdate(static_cast<AggregationStateMax*>(state), value); } + inline void iterateUnaryInlFast(const TypedValue &value, std::uint8_t *byte_ptr) const { + DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + compareAndUpdateFast(max_ptr, value); + } + + inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + iterateUnaryInlFast(arguments.front(), byte_ptr); + } + + void initPayload(uint8_t *byte_ptr) override { + TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + TypedValue t1 = (type_.getNullableVersion().makeNullValue()); + *max_ptr = t1; + } + AggregationState* accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; @@ -123,6 +140,9 @@ class AggregationHandleMax : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const std::uint8_t *source, + std::uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override { return TypedValue(static_cast<const AggregationStateMax&>(state).max_); } @@ -131,9 +151,15 @@ class AggregationHandleMax : public AggregationConcreteHandle { return TypedValue(static_cast<const AggregationStateMax&>(state).max_); } + inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const { + const TypedValue *max_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); + return TypedValue(*max_ptr); + } + ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() @@ -155,6 +181,10 @@ class AggregationHandleMax : public AggregationConcreteHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const override; + size_t getPayloadSize() const override { + return sizeof(TypedValue); + } + private: friend class AggregateFunctionMax; @@ -181,6 +211,13 @@ class AggregationHandleMax : public AggregationConcreteHandle { } } + inline void compareAndUpdateFast(TypedValue *max_ptr, const TypedValue &value) const { + if (value.isNull()) return; + if (max_ptr->isNull() || fast_comparator_->compareTypedValues(value, *max_ptr)) { + *max_ptr = value; + } + } + const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index e860d8d..9d5be72 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -89,7 +89,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable( DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for MIN: " << argument_ids.size(); - aggregateValueAccessorIntoHashTableUnaryHelper< +/* aggregateValueAccessorIntoHashTableUnaryHelper< AggregationHandleMin, AggregationStateMin, AggregationStateHashTable<AggregationStateMin>>( @@ -97,7 +97,7 @@ void AggregationHandleMin::aggregateValueAccessorIntoHashTable( argument_ids.front(), group_by_key_ids, AggregationStateMin(type_), - hash_table); + hash_table);*/ } void AggregationHandleMin::mergeStates( @@ -111,14 +111,27 @@ void AggregationHandleMin::mergeStates( } } +void AggregationHandleMin::mergeStatesFast( + const std::uint8_t *source, + std::uint8_t *destination) const { + const TypedValue *src_min_ptr = reinterpret_cast<const TypedValue *>(source); + TypedValue *dst_min_ptr = reinterpret_cast<TypedValue *>(destination); + + if (!(src_min_ptr->isNull())) { + compareAndUpdateFast(dst_min_ptr, *src_min_ptr); + } +} + ColumnVector* AggregationHandleMin::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleMin, - AggregationStateHashTable<AggregationStateMin>>( + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleMin, + AggregationStateFastHashTable>( type_.getNonNullableVersion(), hash_table, - group_by_keys); + group_by_keys, + index); } AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( @@ -144,9 +157,8 @@ void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( void AggregationHandleMin::mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleMin, - AggregationStateMin, - AggregationStateHashTable<AggregationStateMin>>( + mergeGroupByHashTablesHelperFast<AggregationHandleMin, + AggregationStateFastHashTable>( source_hash_table, destination_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 924698c..6329cd7 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -29,6 +29,7 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -64,6 +65,11 @@ class AggregationStateMin : public AggregationState { */ ~AggregationStateMin() override {} + size_t getPayloadSize() const { + return sizeof(TypedValue); + } + + private: friend class AggregationHandleMin; @@ -104,6 +110,22 @@ class AggregationHandleMin : public AggregationConcreteHandle { compareAndUpdate(state, value); } + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); + TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + compareAndUpdateFast(min_ptr, value); + } + + inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + iterateUnaryInlFast(arguments.front(), byte_ptr); + } + + void initPayload(uint8_t *byte_ptr) override { + TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); + TypedValue t1 = (type_.getNullableVersion().makeNullValue()); + *min_ptr = t1; + } + AggregationState* accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; @@ -122,6 +144,9 @@ class AggregationHandleMin : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const uint8_t *source, + uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override { return static_cast<const AggregationStateMin&>(state).min_; } @@ -130,9 +155,15 @@ class AggregationHandleMin : public AggregationConcreteHandle { return static_cast<const AggregationStateMin&>(state).min_; } + inline TypedValue finalizeHashTableEntryFast(const std::uint8_t *byte_ptr) const { + const TypedValue *min_ptr = reinterpret_cast<const TypedValue *>(byte_ptr); + return TypedValue(*min_ptr); + } + ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const override; /** * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() @@ -153,6 +184,10 @@ class AggregationHandleMin : public AggregationConcreteHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const override; + size_t getPayloadSize() const override { + return sizeof(TypedValue); + } + private: friend class AggregateFunctionMin; @@ -178,6 +213,13 @@ class AggregationHandleMin : public AggregationConcreteHandle { } } + inline void compareAndUpdateFast(TypedValue *min_ptr, const TypedValue &value) const { + if (value.isNull()) return; + if (min_ptr->isNull() || fast_comparator_->compareTypedValues(value, *min_ptr)) { + *min_ptr = value; + } + } + const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index b5036a8..7a16605 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -93,7 +93,6 @@ AggregationState* AggregationHandleSum::accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const { DCHECK_EQ(1u, column_vectors.size()) << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size(); - std::size_t num_tuples = 0; TypedValue cv_sum = fast_operator_->accumulateColumnVector( blank_state_.sum_, @@ -127,7 +126,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable( DCHECK_EQ(1u, argument_ids.size()) << "Got wrong number of arguments for SUM: " << argument_ids.size(); - aggregateValueAccessorIntoHashTableUnaryHelper< +/* aggregateValueAccessorIntoHashTableUnaryHelper< AggregationHandleSum, AggregationStateSum, AggregationStateHashTable<AggregationStateSum>>( @@ -135,7 +134,7 @@ void AggregationHandleSum::aggregateValueAccessorIntoHashTable( argument_ids.front(), group_by_key_ids, blank_state_, - hash_table); + hash_table);*/ } void AggregationHandleSum::mergeStates( @@ -150,6 +149,17 @@ void AggregationHandleSum::mergeStates( sum_destination->null_ = sum_destination->null_ && sum_source.null_; } +void AggregationHandleSum::mergeStatesFast( + const uint8_t *source, + uint8_t *destination) const { + const TypedValue *src_sum_ptr = reinterpret_cast<const TypedValue *>(source+blank_state_.sum_offset); + const bool *src_null_ptr = reinterpret_cast<const bool *>(source+blank_state_.null_offset); + TypedValue *dst_sum_ptr = reinterpret_cast<TypedValue *>(destination+blank_state_.sum_offset); + bool *dst_null_ptr = reinterpret_cast<bool *>(destination+blank_state_.null_offset); + *dst_sum_ptr = merge_operator_->applyToTypedValues(*dst_sum_ptr, *src_sum_ptr); + *dst_null_ptr = (*dst_null_ptr) && (*src_null_ptr); +} + TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { const AggregationStateSum &agg_state = static_cast<const AggregationStateSum&>(state); if (agg_state.null_) { @@ -162,12 +172,14 @@ TypedValue AggregationHandleSum::finalize(const AggregationState &state) const { ColumnVector* AggregationHandleSum::finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const { - return finalizeHashTableHelper<AggregationHandleSum, - AggregationStateHashTable<AggregationStateSum>>( + std::vector<std::vector<TypedValue>> *group_by_keys, + int index) const { + return finalizeHashTableHelperFast<AggregationHandleSum, + AggregationStateFastHashTable>( *result_type_, hash_table, - group_by_keys); + group_by_keys, + index); } AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( @@ -193,9 +205,8 @@ void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( void AggregationHandleSum::mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const { - mergeGroupByHashTablesHelper<AggregationHandleSum, - AggregationStateSum, - AggregationStateHashTable<AggregationStateSum>>( + mergeGroupByHashTablesHelperFast<AggregationHandleSum, + AggregationStateFastHashTable>( source_hash_table, destination_hash_table); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index 3382646..79f8331 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -29,6 +29,7 @@ #include "expressions/aggregation/AggregationConcreteHandle.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTableBase.hpp" +#include "storage/FastHashTable.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" #include "types/TypedValue.hpp" @@ -57,27 +58,39 @@ class AggregationStateSum : public AggregationState { */ AggregationStateSum(const AggregationStateSum &orig) : sum_(orig.sum_), - null_(orig.null_) { + null_(orig.null_), + sum_offset(orig.sum_offset), + null_offset(orig.null_offset) { } private: friend class AggregationHandleSum; AggregationStateSum() - : sum_(0), null_(true) { + : sum_(0), null_(true), sum_offset(0), + null_offset(reinterpret_cast<uint8_t *>(&null_)-reinterpret_cast<uint8_t *>(&sum_)) { } AggregationStateSum(TypedValue &&sum, const bool is_null) : sum_(std::move(sum)), null_(is_null) { } + size_t getPayloadSize() const { + size_t p1 = reinterpret_cast<size_t>(&sum_); + size_t p2 = reinterpret_cast<size_t>(&mutex_); + return (p2-p1); + } + // TODO(shoban): We might want to specialize sum_ to use atomics for int types // similar to in AggregationStateCount. TypedValue sum_; bool null_; SpinMutex mutex_; + + int sum_offset, null_offset; }; + /** * @brief An aggregationhandle for sum. **/ @@ -105,6 +118,26 @@ class AggregationHandleSum : public AggregationConcreteHandle { state->null_ = false; } + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); + if (value.isNull()) return; + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); + bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset); + *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value); + *null_ptr = false; + } + + inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + iterateUnaryInlFast(arguments.front(), byte_ptr); + } + + void initPayload(uint8_t *byte_ptr) override { + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); + bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset); + *sum_ptr = blank_state_.sum_; + *null_ptr = true; + } + AggregationState* accumulateColumnVectors( const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const override; @@ -123,15 +156,24 @@ class AggregationHandleSum : public AggregationConcreteHandle { void mergeStates(const AggregationState &source, AggregationState *destination) const override; + void mergeStatesFast(const uint8_t *source, + uint8_t *destination) const override; + TypedValue finalize(const AggregationState &state) const override; inline TypedValue finalizeHashTableEntry(const AggregationState &state) const { return static_cast<const AggregationStateSum&>(state).sum_; } + inline TypedValue finalizeHashTableEntryFast(const uint8_t *byte_ptr) const { + uint8_t *value_ptr = const_cast<uint8_t*>(byte_ptr); + TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset); + return *sum_ptr; + } + ColumnVector* finalizeHashTable( const AggregationStateHashTableBase &hash_table, - std::vector<std::vector<TypedValue>> *group_by_keys) const override; + std::vector<std::vector<TypedValue>> *group_by_keys, int index) const override; /** * @brief Implementation of AggregationHandle::aggregateOnDistinctifyHashTableForSingle() @@ -152,6 +194,10 @@ class AggregationHandleSum : public AggregationConcreteHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const override; + size_t getPayloadSize() const override { + return blank_state_.getPayloadSize(); + } + private: friend class AggregateFunctionSum; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/expressions/aggregation/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt index 888d95c..98222df 100644 --- a/expressions/aggregation/CMakeLists.txt +++ b/expressions/aggregation/CMakeLists.txt @@ -146,9 +146,11 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory + quickstep_threading_SpinMutex quickstep_types_TypedValue quickstep_types_containers_ColumnVector quickstep_utility_Macros) @@ -163,6 +165,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -180,6 +183,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -204,6 +208,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -220,6 +225,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory @@ -236,6 +242,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationConcreteHandle quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_HashTableFactory http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 3f6e23a..7d6d179 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -83,6 +83,9 @@ AggregationOperationState::AggregationOperationState( group_by_types.emplace_back(&group_by_element->getType()); } + std::vector<AggregationHandle *> group_by_handles; + group_by_handles.clear(); + if (aggregate_functions.size() == 0) { // If there is no aggregation function, then it is a distinctify operation // on the group-by expressions. @@ -92,11 +95,17 @@ AggregationOperationState::AggregationOperationState( arguments_.push_back({}); is_distinct_.emplace_back(false); - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + /* group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( new HashTablePool(estimated_num_entries, hash_table_impl_type, group_by_types, handles_.back().get(), + storage_manager)));*/ + group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + handles_.back(), storage_manager))); } else { // Set up each individual aggregate in this operation. @@ -107,6 +116,7 @@ AggregationOperationState::AggregationOperationState( std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin(); std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it = distinctify_hash_table_impl_types.begin(); + std::vector<std::size_t> payload_sizes; for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) { // Get the Types of this aggregate's arguments so that we can create an // AggregationHandle. @@ -126,12 +136,15 @@ AggregationOperationState::AggregationOperationState( if (!group_by_list_.empty()) { // Aggregation with GROUP BY: create a HashTable pool for per-group states. - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + /* group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( new HashTablePool(estimated_num_entries, hash_table_impl_type, group_by_types, handles_.back().get(), - storage_manager))); + storage_manager)));*/ + group_by_handles.emplace_back(handles_.back()); + payload_sizes.emplace_back(handles_.back()->getPayloadSize()); + } else { // Aggregation without GROUP BY: create a single global state. single_states_.emplace_back(handles_.back()->createInitialState()); @@ -166,17 +179,40 @@ AggregationOperationState::AggregationOperationState( // the number of entries in the distinctify hash table. We may estimate // for each distinct aggregation an estimated_num_distinct_keys value during // query optimization, if it worths. - distinctify_hashtables_.emplace_back( + /* distinctify_hashtables_.emplace_back( handles_.back()->createDistinctifyHashTable( *distinctify_hash_table_impl_types_it, key_types, estimated_num_entries, + storage_manager));*/ + +std::vector<AggregationHandle *> local; +local.emplace_back(handles_.back()); + distinctify_hashtables_.emplace_back( +AggregationStateFastHashTableFactory::CreateResizable( + *distinctify_hash_table_impl_types_it, + key_types, + estimated_num_entries, + {0}, + local, storage_manager)); + ++distinctify_hash_table_impl_types_it; } else { distinctify_hashtables_.emplace_back(nullptr); } } + + if (!group_by_handles.empty()) { + // Aggregation with GROUP BY: create a HashTable pool for per-group states. + group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + new HashTablePool(estimated_num_entries, + hash_table_impl_type, + group_by_types, + payload_sizes, + group_by_handles, + storage_manager))); + } } } @@ -410,17 +446,24 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo // Call StorageBlock::aggregateGroupBy() to aggregate this block's values // directly into the (threadsafe) shared global HashTable for this // aggregate. - DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); - AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); + DCHECK(group_by_hashtable_pools_[0] != nullptr); + AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); DCHECK(agg_hash_table != nullptr); - block->aggregateGroupBy(*handles_[agg_idx], + /* block->aggregateGroupBy(*handles_[agg_idx], arguments_[agg_idx], group_by_list_, predicate_.get(), agg_hash_table, &reuse_matches, + &reuse_group_by_vectors);*/ + block->aggregateGroupByFast(arguments_, + group_by_list_, + predicate_.get(), + agg_hash_table, + &reuse_matches, &reuse_group_by_vectors); - group_by_hashtable_pools_[agg_idx]->returnHashTable(agg_hash_table); + group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); + break; } } } @@ -444,6 +487,12 @@ void AggregationOperationState::finalizeSingleState(InsertDestination *output_de output_destination->insertTuple(Tuple(std::move(attribute_values))); } +void AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase *src, + AggregationStateHashTableBase *dst) { + HashTableMergerNewFast merger(dst); + (static_cast<FastHashTable<true, false, true, false> *>(src))->forEachCompositeKeyFast(&merger); +} + void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) { // Each element of 'group_by_keys' is a vector of values for a particular // group (which is also the prefix of the finalized Tuple for that group). @@ -455,18 +504,21 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest // TODO(harshad) - Find heuristics for faster merge, even in a single thread. // e.g. Keep merging entries from smaller hash tables to larger. +// auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); + + auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); if (hash_tables->size() > 1) { for (int hash_table_index = 0; hash_table_index < static_cast<int>(hash_tables->size() - 1); ++hash_table_index) { // Merge each hash table to the last hash table. - handles_[agg_idx]->mergeGroupByHashTables( - (*(*hash_tables)[hash_table_index]), + mergeGroupByHashTables( + (*hash_tables)[hash_table_index].get(), hash_tables->back().get()); } } + break; } // Collect per-aggregate finalized values. @@ -475,16 +527,16 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest agg_idx < handles_.size(); ++agg_idx) { if (is_distinct_[agg_idx]) { - DCHECK(group_by_hashtable_pools_[agg_idx] != nullptr); - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + DCHECK(group_by_hashtable_pools_[0] != nullptr); + auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); - group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); + group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); } DCHECK(hash_tables->back() != nullptr); AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); @@ -494,21 +546,22 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest agg_hash_table); } - auto *hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); DCHECK(hash_tables != nullptr); if (hash_tables->empty()) { // We may have a case where hash_tables is empty, e.g. no input blocks. // However for aggregateOnDistinctifyHashTableForGroupBy to work // correctly, we should create an empty group by hash table. - AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[agg_idx]->getHashTable(); - group_by_hashtable_pools_[agg_idx]->returnHashTable(new_hash_table); - hash_tables = group_by_hashtable_pools_[agg_idx]->getAllHashTables(); + AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTable(); + group_by_hashtable_pools_[0]->returnHashTable(new_hash_table); + hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); } AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); ColumnVector* agg_result_col = handles_[agg_idx]->finalizeHashTable(*agg_hash_table, - &group_by_keys); + &group_by_keys, + agg_idx); if (agg_result_col != nullptr) { final_values.emplace_back(agg_result_col); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index ecd116b..d408c22 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -165,6 +165,8 @@ class AggregationOperationState { **/ void finalizeAggregate(InsertDestination *output_destination); + int dflag; + private: // Merge locally (per storage block) aggregated states with global aggregation // states. @@ -185,7 +187,8 @@ class AggregationOperationState { // Each individual aggregate in this operation has an AggregationHandle and // some number of Scalar arguments. - std::vector<std::unique_ptr<AggregationHandle>> handles_; +// std::vector<std::unique_ptr<AggregationHandle>> handles_; + std::vector<AggregationHandle *> handles_; std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_; // For each aggregate, whether DISTINCT should be applied to the aggregate's @@ -215,6 +218,8 @@ class AggregationOperationState { StorageManager *storage_manager_; + void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst); + DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0756e7e/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index 65a7975..b6f2ef9 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -198,6 +198,9 @@ if (ENABLE_DISTRIBUTED) endif() add_library(quickstep_storage_EvictionPolicy EvictionPolicy.cpp EvictionPolicy.hpp) +add_library(quickstep_storage_FastHashTable ../empty_src.cpp FastHashTable.hpp) +add_library(quickstep_storage_FastHashTableFactory ../empty_src.cpp FastHashTableFactory.hpp) +add_library(quickstep_storage_FastSeparateChainingHashTable ../empty_src.cpp FastSeparateChainingHashTable.hpp) add_library(quickstep_storage_FileManager ../empty_src.cpp FileManager.hpp) if (QUICKSTEP_HAVE_FILE_MANAGER_HDFS) add_library(quickstep_storage_FileManagerHdfs FileManagerHdfs.cpp FileManagerHdfs.hpp) @@ -626,6 +629,55 @@ target_link_libraries(quickstep_storage_EvictionPolicy quickstep_threading_SpinMutex quickstep_threading_SpinSharedMutex quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastHashTable + quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregationHandleAvg + quickstep_storage_HashTable + quickstep_storage_HashTableBase + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_storage_TupleReference + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorUtil + quickstep_threading_SpinMutex + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_utility_BloomFilter + quickstep_utility_HashPair + quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastHashTableFactory + glog + quickstep_storage_FastHashTable + quickstep_storage_FastSeparateChainingHashTable + quickstep_storage_HashTable + quickstep_storage_HashTable_proto + quickstep_storage_HashTableBase + quickstep_storage_HashTableFactory + quickstep_storage_LinearOpenAddressingHashTable + quickstep_storage_SeparateChainingHashTable + quickstep_storage_SimpleScalarSeparateChainingHashTable + quickstep_storage_TupleReference + quickstep_types_TypeFactory + quickstep_utility_BloomFilter + quickstep_utility_Macros) +target_link_libraries(quickstep_storage_FastSeparateChainingHashTable + quickstep_storage_FastHashTable + quickstep_storage_HashTable + quickstep_storage_HashTableBase + quickstep_storage_HashTableKeyManager + quickstep_storage_StorageBlob + quickstep_storage_StorageBlockInfo + quickstep_storage_StorageConstants + quickstep_storage_StorageManager + quickstep_threading_SpinSharedMutex + quickstep_types_Type + quickstep_types_TypedValue + quickstep_utility_Alignment + quickstep_utility_Macros + quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_FileManager quickstep_storage_StorageBlockInfo quickstep_utility_Macros @@ -711,6 +763,8 @@ target_link_libraries(quickstep_storage_HashTableKeyManager target_link_libraries(quickstep_storage_HashTablePool glog quickstep_expressions_aggregation_AggregationHandle + quickstep_storage_FastHashTable + quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase quickstep_threading_SpinMutex quickstep_utility_Macros @@ -915,6 +969,7 @@ target_link_libraries(quickstep_storage_StorageBlock quickstep_storage_CompressedColumnStoreTupleStorageSubBlock quickstep_storage_CompressedPackedRowStoreTupleStorageSubBlock quickstep_storage_CountedReference + quickstep_storage_FastHashTable quickstep_storage_HashTableBase quickstep_storage_IndexSubBlock quickstep_storage_InsertDestinationInterface @@ -1098,6 +1153,9 @@ target_link_libraries(quickstep_storage quickstep_storage_EvictionPolicy quickstep_storage_FileManager quickstep_storage_FileManagerLocal + quickstep_storage_FastHashTable + quickstep_storage_FastHashTableFactory + quickstep_storage_FastSeparateChainingHashTable quickstep_storage_HashTable quickstep_storage_HashTable_proto quickstep_storage_HashTableBase