Fixed 4 failures on unit tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1665593e Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1665593e Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1665593e Branch: refs/heads/quickstep-28-29 Commit: 1665593ebf4553ea9b99ce7c10eeb2f6e577dcce Parents: d0756e7 Author: rathijit <rathi...@node-2.aggregation.quickstep-pg0.wisc.cloudlab.us> Authored: Fri Aug 5 06:00:12 2016 -0500 Committer: Harshad Deshmukh <hbdeshm...@apache.org> Committed: Tue Sep 20 12:56:07 2016 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationConcreteHandle.cpp | 14 +++--- .../aggregation/AggregationConcreteHandle.hpp | 41 ++++++++++++++-- expressions/aggregation/AggregationHandle.hpp | 6 ++- .../aggregation/AggregationHandleAvg.cpp | 14 +++--- .../aggregation/AggregationHandleAvg.hpp | 15 +++++- .../aggregation/AggregationHandleCount.cpp | 7 +-- .../aggregation/AggregationHandleCount.hpp | 19 ++++++-- .../aggregation/AggregationHandleDistinct.cpp | 2 +- .../aggregation/AggregationHandleDistinct.hpp | 2 +- .../aggregation/AggregationHandleMax.cpp | 14 +++--- .../aggregation/AggregationHandleMax.hpp | 13 ++++- .../aggregation/AggregationHandleMin.cpp | 14 +++--- .../aggregation/AggregationHandleMin.hpp | 15 +++++- .../aggregation/AggregationHandleSum.cpp | 15 +++--- .../aggregation/AggregationHandleSum.hpp | 15 +++++- storage/AggregationOperationState.cpp | 51 +++++++++++--------- storage/CMakeLists.txt | 1 - storage/FastHashTable.hpp | 41 +++++++++++++++- storage/FastSeparateChainingHashTable.hpp | 16 +++--- 19 files changed, 221 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationConcreteHandle.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp index 1efe010..ac5148b 100644 --- a/expressions/aggregation/AggregationConcreteHandle.cpp +++ b/expressions/aggregation/AggregationConcreteHandle.cpp @@ -52,17 +52,17 @@ void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable( AggregationStateHashTableBase *distinctify_hash_table) const { // If the key-value pair is already there, we don't need to update the value, // which should always be "true". I.e. the value is just a placeholder. -// const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {}; + // const auto noop_upserter = [](const auto &accessor, const bool *value) -> void {}; AggregationStateFastHashTable *hash_table = static_cast<AggregationStateFastHashTable *>(distinctify_hash_table); if (key_ids.size() == 1) { -// TODO(rathijit): fix -// hash_table->upsertValueAccessor(accessor, -// key_ids[0], -// true /* check_for_null_keys */, -// true /* initial_value */, -// &noop_upserter); + std::vector<std::vector<attribute_id>> args; + args.emplace_back(key_ids); + hash_table->upsertValueAccessorFast(args, + accessor, + key_ids[0], + true /* check_for_null_keys */); } else { std::vector<std::vector<attribute_id>> empty_args; empty_args.resize(1); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationConcreteHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp index d332ec9..609937a 100644 --- a/expressions/aggregation/AggregationConcreteHandle.hpp +++ b/expressions/aggregation/AggregationConcreteHandle.hpp @@ -27,6 +27,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" #include "storage/HashTable.hpp" +#include "storage/FastHashTable.hpp" #include "storage/HashTableBase.hpp" #include "types/TypedValue.hpp" #include "types/containers/ColumnVector.hpp" @@ -278,6 +279,11 @@ class AggregationConcreteHandle : public AggregationHandle { const AggregationStateHashTableBase &distinctify_hash_table) const; template <typename HandleT, + typename StateT> + StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + const AggregationStateHashTableBase &distinctify_hash_table) const; + + template <typename HandleT, typename StateT, typename HashTableT> void aggregateOnDistinctifyHashTableForGroupByUnaryHelper( @@ -289,7 +295,7 @@ class AggregationConcreteHandle : public AggregationHandle { typename HashTableT> void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *hash_table) const; + AggregationStateHashTableBase *hash_table, int index) const; template <typename HandleT, @@ -494,6 +500,31 @@ StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnary } template <typename HandleT, + typename StateT> +StateT* AggregationConcreteHandle::aggregateOnDistinctifyHashTableForSingleUnaryHelperFast( + const AggregationStateHashTableBase &distinctify_hash_table) const { + const HandleT& handle = static_cast<const HandleT&>(*this); + StateT *state = static_cast<StateT*>(createInitialState()); + + // A lambda function which will be called on each key from the distinctify + // hash table. + const auto aggregate_functor = [&handle, &state](const TypedValue &key, + const std::uint8_t &dumb_placeholder) { + // For each (unary) key in the distinctify hash table, aggregate the key + // into "state". + handle.iterateUnaryInl(state, key); + }; + + const AggregationStateFastHashTable &hash_table = + static_cast<const AggregationStateFastHashTable &>(distinctify_hash_table); + // Invoke the lambda function "aggregate_functor" on each key from the distinctify + // hash table. + hash_table.forEach(&aggregate_functor); + + return state; +} + +template <typename HandleT, typename StateT, typename HashTableT> void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelper( @@ -534,13 +565,13 @@ template <typename HandleT, typename HashTableT> void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { + AggregationStateHashTableBase *aggregation_hash_table, int index) const { const HandleT& handle = static_cast<const HandleT&>(*this); HashTableT *target_hash_table = static_cast<HashTableT*>(aggregation_hash_table); // A lambda function which will be called on each key-value pair from the // distinctify hash table. - const auto aggregate_functor = [&handle, &target_hash_table]( + const auto aggregate_functor = [&handle, &target_hash_table, &index]( std::vector<TypedValue> &key, const bool &dumb_placeholder) { // For each (composite) key vector in the distinctify hash table with size N. @@ -552,10 +583,10 @@ void AggregationConcreteHandle::aggregateOnDistinctifyHashTableForGroupByUnaryHe // An upserter as lambda function for aggregating the argument into its // GROUP BY group's entry inside aggregation_hash_table. const auto upserter = [&handle, &argument](std::uint8_t *state) { - handle.iterateUnaryInlFast(argument, state+sizeof(SpinMutex)); + handle.iterateUnaryInlFast(argument, state); }; - target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter); + target_hash_table->upsertCompositeKeyFast(key, nullptr, &upserter, index); }; const HashTableT &source_hash_table = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandle.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp index 92cd6a7..7c9e544 100644 --- a/expressions/aggregation/AggregationHandle.hpp +++ b/expressions/aggregation/AggregationHandle.hpp @@ -347,7 +347,7 @@ class AggregationHandle { */ virtual void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const = 0; + AggregationStateHashTableBase *aggregation_hash_table, int index) const = 0; /** * @brief Merge two GROUP BY hash tables in one. @@ -362,11 +362,13 @@ class AggregationHandle { const AggregationStateHashTableBase &source_hash_table, AggregationStateHashTableBase *destination_hash_table) const = 0; - virtual size_t getPayloadSize() const {return 8;} + virtual size_t getPayloadSize() const {return 1;} virtual void setPayloadOffset(std::size_t) {} virtual void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) {} virtual void mergeStatesFast(const uint8_t *src, uint8_t *dst) const {} virtual void initPayload(uint8_t *byte_ptr) {} + virtual void BlockUpdate() {} + virtual void AllowUpdate() {} protected: AggregationHandle() { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleAvg.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.cpp b/expressions/aggregation/AggregationHandleAvg.cpp index f38c628..383a81f 100644 --- a/expressions/aggregation/AggregationHandleAvg.cpp +++ b/expressions/aggregation/AggregationHandleAvg.cpp @@ -42,7 +42,7 @@ namespace quickstep { class StorageManager; AggregationHandleAvg::AggregationHandleAvg(const Type &type) - : argument_type_(type) { + : argument_type_(type), block_update(false) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -206,7 +206,7 @@ ColumnVector* AggregationHandleAvg::finalizeHashTable( AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleAvg, AggregationStateAvg>( distinctify_hash_table); @@ -214,14 +214,12 @@ AggregationState* AggregationHandleAvg::aggregateOnDistinctifyHashTableForSingle void AggregationHandleAvg::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, int index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleAvg, - AggregationStateAvg, - AggregationStateHashTable<AggregationStateAvg>>( + AggregationStateFastHashTable>( distinctify_hash_table, - blank_state_, - aggregation_hash_table); + aggregation_hash_table, index); } void AggregationHandleAvg::mergeGroupByHashTables( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleAvg.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleAvg.hpp b/expressions/aggregation/AggregationHandleAvg.hpp index e187d22..15835e0 100644 --- a/expressions/aggregation/AggregationHandleAvg.hpp +++ b/expressions/aggregation/AggregationHandleAvg.hpp @@ -123,7 +123,7 @@ class AggregationHandleAvg : public AggregationConcreteHandle { ++state->count_; } - inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); if (value.isNull()) return; TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); @@ -133,9 +133,18 @@ class AggregationHandleAvg : public AggregationConcreteHandle { } inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (block_update) return; iterateUnaryInlFast(arguments.front(), byte_ptr); } + void BlockUpdate() override { + block_update = true; + } + + void AllowUpdate() override { + block_update = false; + } + void initPayload(uint8_t *byte_ptr) override { TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr + blank_state_.count_offset); @@ -208,7 +217,7 @@ class AggregationHandleAvg : public AggregationConcreteHandle { */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, @@ -235,6 +244,8 @@ class AggregationHandleAvg : public AggregationConcreteHandle { std::unique_ptr<UncheckedBinaryOperator> merge_add_operator_; std::unique_ptr<UncheckedBinaryOperator> divide_operator_; + bool block_update; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleAvg); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleCount.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.cpp b/expressions/aggregation/AggregationHandleCount.cpp index f1eadf1..3a333ef 100644 --- a/expressions/aggregation/AggregationHandleCount.cpp +++ b/expressions/aggregation/AggregationHandleCount.cpp @@ -196,7 +196,7 @@ AggregationState* AggregationHandleCount<count_star, nullable_type> ::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { DCHECK_EQ(count_star, false); - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleCount<count_star, nullable_type>, AggregationStateCount>( distinctify_hash_table); @@ -206,13 +206,14 @@ template <bool count_star, bool nullable_type> void AggregationHandleCount<count_star, nullable_type> ::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { + AggregationStateHashTableBase *aggregation_hash_table, int index) const { DCHECK_EQ(count_star, false); aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleCount<count_star, nullable_type>, AggregationStateFastHashTable>( distinctify_hash_table, - aggregation_hash_table); + aggregation_hash_table, + index); } template <bool count_star, bool nullable_type> http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleCount.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleCount.hpp b/expressions/aggregation/AggregationHandleCount.hpp index ed21c41..a95cae5 100644 --- a/expressions/aggregation/AggregationHandleCount.hpp +++ b/expressions/aggregation/AggregationHandleCount.hpp @@ -135,15 +135,24 @@ class AggregationHandleCount : public AggregationConcreteHandle { } inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (block_update) return; if (arguments.size()) iterateUnaryInlFast(arguments.front(), byte_ptr); else iterateNullaryInlFast(byte_ptr); } + void BlockUpdate() override { + block_update = true; + } + + void AllowUpdate() override { + block_update = false; + } + void initPayload(uint8_t *byte_ptr) override { - std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); - *count_ptr = 0; + std::int64_t *count_ptr = reinterpret_cast<std::int64_t *>(byte_ptr); + *count_ptr = 0; } AggregationState* accumulateNullary(const std::size_t num_tuples) const override { @@ -208,7 +217,7 @@ class AggregationHandleCount : public AggregationConcreteHandle { */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, @@ -224,9 +233,11 @@ class AggregationHandleCount : public AggregationConcreteHandle { /** * @brief Constructor. **/ - AggregationHandleCount() { + AggregationHandleCount() : block_update(false) { } + bool block_update; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleCount); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleDistinct.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp index 2b9391a..a5fc095 100644 --- a/expressions/aggregation/AggregationHandleDistinct.cpp +++ b/expressions/aggregation/AggregationHandleDistinct.cpp @@ -72,7 +72,7 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable( const bool &dumb_placeholder) -> void { group_by_keys->emplace_back(std::move(group_by_key)); }; - static_cast<const AggregationStateHashTable<bool>&>(hash_table).forEachCompositeKey(&keys_retriever); + static_cast<const AggregationStateFastHashTable&>(hash_table).forEachCompositeKey(&keys_retriever); return nullptr; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleDistinct.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp index 0a3acb3..f6ef0c7 100644 --- a/expressions/aggregation/AggregationHandleDistinct.hpp +++ b/expressions/aggregation/AggregationHandleDistinct.hpp @@ -90,7 +90,7 @@ class AggregationHandleDistinct : public AggregationConcreteHandle { void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *groupby_hash_table) const override { + AggregationStateHashTableBase *groupby_hash_table, int index) const override { LOG(FATAL) << "AggregationHandleDistinct does not support " << "aggregateOnDistinctifyHashTableForGroupBy()."; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleMax.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.cpp b/expressions/aggregation/AggregationHandleMax.cpp index 2080a03..c11fcc8 100644 --- a/expressions/aggregation/AggregationHandleMax.cpp +++ b/expressions/aggregation/AggregationHandleMax.cpp @@ -39,7 +39,7 @@ namespace quickstep { class StorageManager; AggregationHandleMax::AggregationHandleMax(const Type &type) - : type_(type) { + : type_(type), block_update(false) { fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kGreater) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); @@ -135,7 +135,7 @@ ColumnVector* AggregationHandleMax::finalizeHashTable( AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleMax, AggregationStateMax>( distinctify_hash_table); @@ -143,14 +143,12 @@ AggregationState* AggregationHandleMax::aggregateOnDistinctifyHashTableForSingle void AggregationHandleMax::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, int index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleMax, - AggregationStateMax, - AggregationStateHashTable<AggregationStateMax>>( + AggregationStateFastHashTable>( distinctify_hash_table, - AggregationStateMax(type_), - aggregation_hash_table); + aggregation_hash_table, index); } void AggregationHandleMax::mergeGroupByHashTables( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleMax.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMax.hpp b/expressions/aggregation/AggregationHandleMax.hpp index 3c06fc4..82d6ebb 100644 --- a/expressions/aggregation/AggregationHandleMax.hpp +++ b/expressions/aggregation/AggregationHandleMax.hpp @@ -113,9 +113,18 @@ class AggregationHandleMax : public AggregationConcreteHandle { } inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (block_update) return; iterateUnaryInlFast(arguments.front(), byte_ptr); } + void BlockUpdate() override { + block_update = true; + } + + void AllowUpdate() override { + block_update = false; + } + void initPayload(uint8_t *byte_ptr) override { TypedValue *max_ptr = reinterpret_cast<TypedValue *>(byte_ptr); TypedValue t1 = (type_.getNullableVersion().makeNullValue()); @@ -175,7 +184,7 @@ class AggregationHandleMax : public AggregationConcreteHandle { */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, @@ -221,6 +230,8 @@ class AggregationHandleMax : public AggregationConcreteHandle { const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; + bool block_update; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleMax); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleMin.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.cpp b/expressions/aggregation/AggregationHandleMin.cpp index 9d5be72..70d6c1b 100644 --- a/expressions/aggregation/AggregationHandleMin.cpp +++ b/expressions/aggregation/AggregationHandleMin.cpp @@ -39,7 +39,7 @@ namespace quickstep { class StorageManager; AggregationHandleMin::AggregationHandleMin(const Type &type) - : type_(type) { + : type_(type), block_update(false) { fast_comparator_.reset(ComparisonFactory::GetComparison(ComparisonID::kLess) .makeUncheckedComparatorForTypes(type, type.getNonNullableVersion())); @@ -136,7 +136,7 @@ ColumnVector* AggregationHandleMin::finalizeHashTable( AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleMin, AggregationStateMin>( distinctify_hash_table); @@ -144,14 +144,12 @@ AggregationState* AggregationHandleMin::aggregateOnDistinctifyHashTableForSingle void AggregationHandleMin::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, int index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleMin, - AggregationStateMin, - AggregationStateHashTable<AggregationStateMin>>( + AggregationStateFastHashTable>( distinctify_hash_table, - AggregationStateMin(type_), - aggregation_hash_table); + aggregation_hash_table, index); } void AggregationHandleMin::mergeGroupByHashTables( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleMin.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleMin.hpp b/expressions/aggregation/AggregationHandleMin.hpp index 6329cd7..0f5e3a1 100644 --- a/expressions/aggregation/AggregationHandleMin.hpp +++ b/expressions/aggregation/AggregationHandleMin.hpp @@ -110,16 +110,25 @@ class AggregationHandleMin : public AggregationConcreteHandle { compareAndUpdate(state, value); } - inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(type_.getSignature())); TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); compareAndUpdateFast(min_ptr, value); } inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (block_update) return; iterateUnaryInlFast(arguments.front(), byte_ptr); } + void BlockUpdate() override { + block_update = true; + } + + void AllowUpdate() override { + block_update = false; + } + void initPayload(uint8_t *byte_ptr) override { TypedValue *min_ptr = reinterpret_cast<TypedValue *>(byte_ptr); TypedValue t1 = (type_.getNullableVersion().makeNullValue()); @@ -178,7 +187,7 @@ class AggregationHandleMin : public AggregationConcreteHandle { */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, @@ -223,6 +232,8 @@ class AggregationHandleMin : public AggregationConcreteHandle { const Type &type_; std::unique_ptr<UncheckedComparator> fast_comparator_; + bool block_update; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleMin); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleSum.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp index 7a16605..534db30 100644 --- a/expressions/aggregation/AggregationHandleSum.cpp +++ b/expressions/aggregation/AggregationHandleSum.cpp @@ -43,7 +43,7 @@ namespace quickstep { class StorageManager; AggregationHandleSum::AggregationHandleSum(const Type &type) - : argument_type_(type) { + : argument_type_(type), block_update(false) { // We sum Int as Long and Float as Double so that we have more headroom when // adding many values. TypeID type_precision_id; @@ -184,7 +184,7 @@ ColumnVector* AggregationHandleSum::finalizeHashTable( AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle( const AggregationStateHashTableBase &distinctify_hash_table) const { - return aggregateOnDistinctifyHashTableForSingleUnaryHelper< + return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast< AggregationHandleSum, AggregationStateSum>( distinctify_hash_table); @@ -192,14 +192,13 @@ AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const { - aggregateOnDistinctifyHashTableForGroupByUnaryHelper< + AggregationStateHashTableBase *aggregation_hash_table, int index) const { + aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast< AggregationHandleSum, - AggregationStateSum, - AggregationStateHashTable<AggregationStateSum>>( + AggregationStateFastHashTable>( distinctify_hash_table, - blank_state_, - aggregation_hash_table); + aggregation_hash_table, + index); } void AggregationHandleSum::mergeGroupByHashTables( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/expressions/aggregation/AggregationHandleSum.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp index 79f8331..3a2252d 100644 --- a/expressions/aggregation/AggregationHandleSum.hpp +++ b/expressions/aggregation/AggregationHandleSum.hpp @@ -118,7 +118,7 @@ class AggregationHandleSum : public AggregationConcreteHandle { state->null_ = false; } - inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) { + inline void iterateUnaryInlFast(const TypedValue &value, uint8_t *byte_ptr) const { DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature())); if (value.isNull()) return; TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); @@ -128,9 +128,18 @@ class AggregationHandleSum : public AggregationConcreteHandle { } inline void iterateInlFast(const std::vector<TypedValue> &arguments, uint8_t *byte_ptr) override { + if (block_update) return; iterateUnaryInlFast(arguments.front(), byte_ptr); } + void BlockUpdate() override { + block_update = true; + } + + void AllowUpdate() override { + block_update = false; + } + void initPayload(uint8_t *byte_ptr) override { TypedValue *sum_ptr = reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset); bool *null_ptr = reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset); @@ -188,7 +197,7 @@ class AggregationHandleSum : public AggregationConcreteHandle { */ void aggregateOnDistinctifyHashTableForGroupBy( const AggregationStateHashTableBase &distinctify_hash_table, - AggregationStateHashTableBase *aggregation_hash_table) const override; + AggregationStateHashTableBase *aggregation_hash_table, int index) const override; void mergeGroupByHashTables( const AggregationStateHashTableBase &source_hash_table, @@ -214,6 +223,8 @@ class AggregationHandleSum : public AggregationConcreteHandle { std::unique_ptr<UncheckedBinaryOperator> fast_operator_; std::unique_ptr<UncheckedBinaryOperator> merge_operator_; + bool block_update; + DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 7d6d179..833b707 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -105,7 +105,8 @@ AggregationOperationState::AggregationOperationState( new HashTablePool(estimated_num_entries, hash_table_impl_type, group_by_types, - handles_.back(), + {1}, + handles_, storage_manager))); } else { // Set up each individual aggregate in this operation. @@ -142,8 +143,11 @@ AggregationOperationState::AggregationOperationState( group_by_types, handles_.back().get(), storage_manager)));*/ + if (*is_distinct_it) { + handles_.back()->BlockUpdate(); + } group_by_handles.emplace_back(handles_.back()); - payload_sizes.emplace_back(handles_.back()->getPayloadSize()); + payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize()); } else { // Aggregation without GROUP BY: create a single global state. @@ -186,26 +190,26 @@ AggregationOperationState::AggregationOperationState( estimated_num_entries, storage_manager));*/ -std::vector<AggregationHandle *> local; -local.emplace_back(handles_.back()); + std::vector<AggregationHandle *> local; + // local.emplace_back(handles_.back()); + local.clear(); distinctify_hashtables_.emplace_back( -AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateFastHashTableFactory::CreateResizable( *distinctify_hash_table_impl_types_it, key_types, estimated_num_entries, {0}, local, storage_manager)); - ++distinctify_hash_table_impl_types_it; } else { distinctify_hashtables_.emplace_back(nullptr); } } - if (!group_by_handles.empty()) { - // Aggregation with GROUP BY: create a HashTable pool for per-group states. - group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( + if (!group_by_handles.empty()) { + // Aggregation with GROUP BY: create a HashTable pool for per-group states. + group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>( new HashTablePool(estimated_num_entries, hash_table_impl_type, group_by_types, @@ -213,7 +217,7 @@ AggregationStateFastHashTableFactory::CreateResizable( group_by_handles, storage_manager))); } - } + } } AggregationOperationState* AggregationOperationState::ReconstructFromProto( @@ -442,13 +446,15 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo distinctify_hashtables_[agg_idx].get(), &reuse_matches, &reuse_group_by_vectors); - } else { - // Call StorageBlock::aggregateGroupBy() to aggregate this block's values - // directly into the (threadsafe) shared global HashTable for this - // aggregate. - DCHECK(group_by_hashtable_pools_[0] != nullptr); - AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); - DCHECK(agg_hash_table != nullptr); + } + } + + // Call StorageBlock::aggregateGroupBy() to aggregate this block's values + // directly into the (threadsafe) shared global HashTable for this + // aggregate. + DCHECK(group_by_hashtable_pools_[0] != nullptr); + AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast(); + DCHECK(agg_hash_table != nullptr); /* block->aggregateGroupBy(*handles_[agg_idx], arguments_[agg_idx], group_by_list_, @@ -456,16 +462,13 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo agg_hash_table, &reuse_matches, &reuse_group_by_vectors);*/ - block->aggregateGroupByFast(arguments_, + block->aggregateGroupByFast(arguments_, group_by_list_, predicate_.get(), agg_hash_table, &reuse_matches, &reuse_group_by_vectors); - group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); - break; - } - } + group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table); } void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) { @@ -541,9 +544,11 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest DCHECK(hash_tables->back() != nullptr); AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get(); DCHECK(agg_hash_table != nullptr); + handles_[agg_idx]->AllowUpdate(); handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy( *distinctify_hashtables_[agg_idx], - agg_hash_table); + agg_hash_table, + agg_idx); } auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index b6f2ef9..79a5b87 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -631,7 +631,6 @@ target_link_libraries(quickstep_storage_EvictionPolicy quickstep_utility_Macros) target_link_libraries(quickstep_storage_FastHashTable quickstep_catalog_CatalogTypedefs - quickstep_expressions_aggregation_AggregationHandleAvg quickstep_storage_HashTable quickstep_storage_HashTableBase quickstep_storage_StorageBlob http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/storage/FastHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/FastHashTable.hpp b/storage/FastHashTable.hpp index 12e447f..cba039a 100644 --- a/storage/FastHashTable.hpp +++ b/storage/FastHashTable.hpp @@ -35,7 +35,6 @@ #include "storage/TupleReference.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorUtil.hpp" -#include "expressions/aggregation/AggregationHandleAvg.hpp" #include "threading/SpinSharedMutex.hpp" #include "threading/SpinMutex.hpp" #include "types/Type.hpp" @@ -434,6 +433,11 @@ class FastHashTable : public HashTableBase<resizable, const uint8_t *init_value_ptr, FunctorT *functor); + template <typename FunctorT> + bool upsertCompositeKeyFast(const std::vector<TypedValue> &key, + const uint8_t *init_value_ptr, + FunctorT *functor, int index); + bool upsertCompositeKeyNewFast(const std::vector<TypedValue> &key, const uint8_t *init_value_ptr, const uint8_t *source_state); @@ -1851,6 +1855,41 @@ template <bool resizable, bool serializable, bool force_key_copy, bool allow_duplicate_keys> +template <typename FunctorT> +bool FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys> + ::upsertCompositeKeyFast(const std::vector<TypedValue> &key, + const std::uint8_t *init_value_ptr, + FunctorT *functor, int index) { + DEBUG_ASSERT(!allow_duplicate_keys); + const std::size_t variable_size = calculateVariableLengthCompositeKeyCopySize(key); + if (resizable) { + for (;;) { + { + SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_); + uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, variable_size); + if (value != nullptr) { + (*functor)(value+payload_offsets_[index]); + return true; + } + } + resize(0, variable_size); + } + } else { + uint8_t *value = upsertCompositeKeyInternalFast(key, init_value_ptr, variable_size); + if (value == nullptr) { + return false; + } else { + (*functor)(value+payload_offsets_[index]); + return true; + } + } +} + + +template <bool resizable, + bool serializable, + bool force_key_copy, + bool allow_duplicate_keys> bool FastHashTable<resizable, serializable, force_key_copy, allow_duplicate_keys> ::upsertCompositeKeyNewFast(const std::vector<TypedValue> &key, const std::uint8_t *init_value_ptr, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1665593e/storage/FastSeparateChainingHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/FastSeparateChainingHashTable.hpp b/storage/FastSeparateChainingHashTable.hpp index 64c4979..756d6e5 100644 --- a/storage/FastSeparateChainingHashTable.hpp +++ b/storage/FastSeparateChainingHashTable.hpp @@ -308,8 +308,11 @@ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_dup key_manager_(this->key_types_, kValueOffset + this->total_payload_size_), bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) { init_payload_ = static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1)); - for (auto handle : handles) - handle->initPayload(init_payload_); + int k = 0; + for (auto handle : handles) { + handle->initPayload(init_payload_+this->payload_offsets_[k]); + k++; + } // Bucket size always rounds up to the alignment requirement of the atomic // size_t "next" pointer at the front or a ValueT, whichever is larger. // @@ -437,8 +440,7 @@ FastSeparateChainingHashTable<resizable, serializable, force_key_copy, allow_dup true), kBucketAlignment(alignof(std::atomic<std::size_t>) < alignof(uint8_t) ? alignof(uint8_t) : alignof(std::atomic<std::size_t>)), - kValueOffset((((sizeof(std::atomic<std::size_t>) + sizeof(std::size_t) - 1) / - alignof(uint8_t)) + 1) * alignof(uint8_t)), + kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)), key_manager_(this->key_types_, kValueOffset + sizeof(uint8_t)), bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) { // Bucket size always rounds up to the alignment requirement of the atomic @@ -1046,7 +1048,6 @@ uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, else memcpy(value, init_value_ptr, this->total_payload_size_); - // Update the previous chain pointer to point to the new bucket. pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release); @@ -1168,10 +1169,11 @@ uint8_t* FastSeparateChainingHashTable<resizable, serializable, force_key_copy, // uint8_t *value; // value = static_cast<unsigned char*>(bucket) + kValueOffset; uint8_t *value = static_cast<unsigned char*>(bucket) + kValueOffset; - if (init_value_ptr == nullptr) + if (init_value_ptr == nullptr) { memcpy(value, init_payload_, this->total_payload_size_); - else + } else { memcpy(value, init_value_ptr, this->total_payload_size_); + } // Update the previous chaing pointer to point to the new bucket. pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);