Repository: incubator-quickstep Updated Branches: refs/heads/agg-expr bc81c5b3f -> 1a6435f11
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1a6435f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1a6435f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1a6435f1 Branch: refs/heads/agg-expr Commit: 1a6435f11c35a2d7e73a97ef21528d5ac3c27320 Parents: bc81c5b Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Sun Feb 26 19:21:21 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Sun Feb 26 19:21:21 2017 -0600 ---------------------------------------------------------------------- expressions/aggregation/AggFunc.hpp | 10 + storage/AggregationOperationState.cpp | 68 +--- storage/AggregationUtil.hpp | 197 ++++++++++++ storage/CMakeLists.txt | 16 +- storage/CollisionFreeVectorTable.cpp | 160 +--------- storage/PackedPayloadHashTable.cpp | 177 ++++++++--- storage/PackedPayloadHashTable.hpp | 488 ++++++++++++++++++----------- 7 files changed, 673 insertions(+), 443 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/expressions/aggregation/AggFunc.hpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp index 31f385e..98b19ba 100644 --- a/expressions/aggregation/AggFunc.hpp +++ b/expressions/aggregation/AggFunc.hpp @@ -69,6 +69,11 @@ class Sum { typename AggState<ArgType>::AtomicT> {}; template <typename ArgType> + inline static void InitAtomic(typename AggState<ArgType>::AtomicT *state) { + state->store(0, std::memory_order_relaxed); + } + + template <typename ArgType> inline static void MergeArgAtomic(const typename ArgType::cpptype &value, typename AggState<ArgType>::AtomicT *state) { LOG(FATAL) << "Not implemented"; @@ -81,6 +86,11 @@ class Sum { } template <typename ArgType> + inline static void InitUnsafe(typename AggState<ArgType>::T *state) { + *state = 0; + } + + template <typename ArgType> inline static void MergeArgUnsafe(const typename ArgType::cpptype &value, typename AggState<ArgType>::T *state) { *state += value; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index 00bb433..392e0f6 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -212,7 +212,7 @@ AggregationOperationState::AggregationOperationState( AggregationStateHashTableFactory::CreateResizable( hash_table_impl_type, group_by_types_, - estimated_num_entries, + estimated_num_entries * 2, group_by_handles, storage_manager_)); } else { @@ -384,12 +384,12 @@ bool AggregationOperationState::checkAggregatePartitioned( } } - // There are GROUP BYs without DISTINCT. Check if the estimated number of - // groups is large enough to warrant a partitioned aggregation. - return estimated_num_groups > - static_cast<std::size_t>( - FLAGS_partition_aggregation_num_groups_threshold); - return false; +// // There are GROUP BYs without DISTINCT. Check if the estimated number of +// // groups is large enough to warrant a partitioned aggregation. +// return estimated_num_groups > +// static_cast<std::size_t>( +// FLAGS_partition_aggregation_num_groups_threshold); + return true; } std::size_t AggregationOperationState::getNumInitializationPartitions() const { @@ -663,60 +663,10 @@ void AggregationOperationState::finalizeHashTableImplPartitioned( InsertDestination *output_destination) { PackedPayloadHashTable *hash_table = static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get()); +// std::cout << hash_table->numEntries() << "\n"; - // Each element of 'group_by_keys' is a vector of values for a particular - // group (which is also the prefix of the finalized Tuple for that group). - std::vector<std::vector<TypedValue>> group_by_keys; - - if (handles_.empty()) { - hash_table->forEachCompositeKeyInPartition( - partition_id, - [&](std::vector<TypedValue> &group_by_key) -> void { - group_by_keys.emplace_back(std::move(group_by_key)); - }); - } - - // Collect per-aggregate finalized values. - std::vector<std::unique_ptr<ColumnVector>> final_values; - for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) { - ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable( - *hash_table, agg_idx, &group_by_keys); - if (agg_result_col != nullptr) { - final_values.emplace_back(agg_result_col); - } - } -// hash_table->destroyPayload(); - - std::vector<std::unique_ptr<ColumnVector>> group_by_cvs; - std::size_t group_by_element_idx = 0; - for (const Type *group_by_type : group_by_types_) { - if (NativeColumnVector::UsableForType(*group_by_type)) { - NativeColumnVector *element_cv = - new NativeColumnVector(*group_by_type, group_by_keys.size()); - group_by_cvs.emplace_back(element_cv); - for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); - } - } else { - IndirectColumnVector *element_cv = - new IndirectColumnVector(*group_by_type, group_by_keys.size()); - group_by_cvs.emplace_back(element_cv); - for (std::vector<TypedValue> &group_key : group_by_keys) { - element_cv->appendTypedValue(std::move(group_key[group_by_element_idx])); - } - } - ++group_by_element_idx; - } - - // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys - // and the finalized aggregates. ColumnVectorsValueAccessor complete_result; - for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) { - complete_result.addColumn(group_by_cv.release()); - } - for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) { - complete_result.addColumn(final_value_cv.release()); - } + hash_table->finalize(partition_id, &complete_result); // Bulk-insert the complete result. output_destination->bulkInsertTuples(&complete_result); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/AggregationUtil.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationUtil.hpp b/storage/AggregationUtil.hpp new file mode 100644 index 0000000..74a9095 --- /dev/null +++ b/storage/AggregationUtil.hpp @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_ +#define QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_ + +#include <type_traits> + +#include "expressions/aggregation/AggregationID.hpp" +#include "expressions/aggregation/AggFunc.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/TypeID.hpp" +#include "types/Type.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +/** \addtogroup Storage + * @{ + */ + +template <typename T> +using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>; + +template <typename FunctorT> +inline auto InvokeOnKeyType(const Type &type, + const FunctorT &functor) { + switch (type.getTypeID()) { + case TypeID::kInt: + return functor(static_cast<const IntType&>(type)); + case TypeID::kLong: + return functor(static_cast<const LongType&>(type)); + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeOnType(const Type &type, + const FunctorT &functor) { + switch (type.getTypeID()) { + case TypeID::kInt: + return functor(static_cast<const IntType&>(type)); + case TypeID::kLong: + return functor(static_cast<const LongType&>(type)); + case TypeID::kFloat: + return functor(static_cast<const FloatType&>(type)); + case TypeID::kDouble: + return functor(static_cast<const DoubleType&>(type)); + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeOnBool(const bool &val, + const FunctorT &functor) { + if (val) { + return functor(std::true_type()); + } else { + return functor(std::false_type()); + } +} + +template <typename FunctorT> +inline auto InvokeOnBools(const bool &val1, + const bool &val2, + const FunctorT &functor) { + if (val1) { + if (val2) { + return functor(std::true_type(), std::true_type()); + } else { + return functor(std::true_type(), std::false_type()); + } + } else { + if (val2) { + return functor(std::false_type(), std::true_type()); + } else { + return functor(std::false_type(), std::false_type()); + } + } +} + +template <typename FunctorT> +inline auto InvokeOnAggFunc(const AggregationID &agg_id, + const FunctorT &functor) { + switch (agg_id) { + case AggregationID::kSum: { + return functor(Sum()); + } + default: + LOG(FATAL) << "Not supported"; + } +} + +template <typename FunctorT> +inline auto InvokeIf(const std::true_type &val, + const FunctorT &functor) { + return functor(); +} + +template <typename FunctorT> +inline void InvokeIf(const std::false_type &val, + const FunctorT &functor) { +} + +//template <typename FunctorT> +//inline void InvokeOnAggFuncIfApplicableToArgType( +// const AggregationID &agg_id, +// const Type &arg_type, +// const FunctorT &functor) { +// InvokeOnAggFunc( +// agg_id, +// [&](const auto &agg_func) -> void { +// InvokeOnType( +// arg_type, +// [&](const auto &arg_type) -> void { +// using AggFuncT = std::remove_reference_t<decltype(agg_func)>; +// using ArgT = remove_const_reference_t<decltype(arg_type)>; +// +// InvokeIf( +// typename AggFuncT::template HasAtomicImpl<ArgT>(), +// [&]() -> void { +// functor(agg_func, arg_type); +// }); +// }); +// }); +//} + +template <typename FunctorT> +inline void InvokeOnAggFuncWithArgType( + const AggregationID &agg_id, + const Type &arg_type, + const FunctorT &functor) { + InvokeOnAggFunc( + agg_id, + [&](const auto &agg_func) -> void { + InvokeOnType( + arg_type, + [&](const auto &arg_type) -> void { + functor(agg_func, arg_type); + }); + }); +} + +template <typename FunctorT> +inline auto InvokeOnTwoAccessors( + const ValueAccessorMultiplexer &accessor_mux, + const ValueAccessorSource &first_source, + const ValueAccessorSource &second_source, + const FunctorT &functor) { + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + ColumnVectorsValueAccessor *derived_accessor = + static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); + + InvokeOnAnyValueAccessor( + base_accessor, + [&](auto *accessor) { + if (first_source == ValueAccessorSource::kBase) { + if (second_source == ValueAccessorSource::kBase) { + return functor(std::false_type(), accessor, accessor); + } else { + return functor(std::true_type(), accessor, derived_accessor); + } + } else { + if (second_source == ValueAccessorSource::kBase) { + return functor(std::true_type(), derived_accessor, accessor); + } else { + return functor(std::false_type(), derived_accessor, derived_accessor); + } + } + }); +} + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index fcc069b..8ef3560 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -145,6 +145,7 @@ add_library(quickstep_storage_AggregationOperationState AggregationOperationState.cpp AggregationOperationState.hpp) add_library(quickstep_storage_AggregationOperationState_proto ${storage_AggregationOperationState_proto_srcs}) +add_library(quickstep_storage_AggregationUtil ../empty_src.cpp AggregationUtil.hpp) add_library(quickstep_storage_BasicColumnStoreTupleStorageSubBlock BasicColumnStoreTupleStorageSubBlock.cpp BasicColumnStoreTupleStorageSubBlock.hpp) @@ -304,6 +305,15 @@ target_link_libraries(quickstep_storage_AggregationOperationState_proto quickstep_expressions_aggregation_AggregateFunction_proto quickstep_storage_HashTable_proto ${PROTOBUF_LIBRARY}) +target_link_libraries(quickstep_storage_AggregationUtil + quickstep_expressions_aggregation_AggFunc + quickstep_expressions_aggregation_AggregationID + quickstep_storage_ValueAccessor + quickstep_storage_ValueAccessorMultiplexer + quickstep_storage_ValueAccessorUtil + quickstep_types_Type + quickstep_types_TypeID + quickstep_types_containers_ColumnVectorsValueAccessor) target_link_libraries(quickstep_storage_BasicColumnStoreTupleStorageSubBlock quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogRelationSchema @@ -441,6 +451,7 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_expressions_aggregation_AggFunc quickstep_expressions_aggregation_AggregationHandle quickstep_expressions_aggregation_AggregationID + quickstep_storage_AggregationUtil quickstep_storage_HashTableBase quickstep_storage_StorageBlob quickstep_storage_StorageBlockInfo @@ -453,7 +464,6 @@ target_link_libraries(quickstep_storage_CollisionFreeVectorTable quickstep_types_Type quickstep_types_TypeID quickstep_types_containers_ColumnVector - quickstep_types_containers_ColumnVectorsValueAccessor quickstep_utility_BoolVector quickstep_utility_Macros) target_link_libraries(quickstep_storage_ColumnStoreUtil @@ -801,8 +811,11 @@ target_link_libraries(quickstep_storage_LinearOpenAddressingHashTable quickstep_utility_Macros quickstep_utility_PrimeNumber) target_link_libraries(quickstep_storage_PackedPayloadHashTable + ${GFLAGS_LIB_NAME} quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle + quickstep_expressions_aggregation_AggFunc + quickstep_storage_AggregationUtil quickstep_storage_HashTableBase quickstep_storage_HashTableKeyManager quickstep_storage_StorageBlob @@ -1111,6 +1124,7 @@ add_library(quickstep_storage ../empty_src.cpp StorageModule.hpp) target_link_libraries(quickstep_storage quickstep_storage_AggregationOperationState quickstep_storage_AggregationOperationState_proto + quickstep_storage_AggregationUtil quickstep_storage_BasicColumnStoreTupleStorageSubBlock quickstep_storage_BasicColumnStoreValueAccessor quickstep_storage_BloomFilterIndexSubBlock http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/CollisionFreeVectorTable.cpp ---------------------------------------------------------------------- diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp index c92f0ab..4c57cc9 100644 --- a/storage/CollisionFreeVectorTable.cpp +++ b/storage/CollisionFreeVectorTable.cpp @@ -29,14 +29,13 @@ #include "expressions/aggregation/AggregationHandle.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "storage/AggregationUtil.hpp" #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" #include "storage/ValueAccessor.hpp" #include "storage/ValueAccessorMultiplexer.hpp" #include "storage/ValueAccessorUtil.hpp" #include "threading/SpinMutex.hpp" -#include "types/TypeID.hpp" -#include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/BoolVector.hpp" #include "glog/logging.h" @@ -46,163 +45,6 @@ namespace quickstep { DEFINE_uint64(vt_threadprivate_threshold, 1000000L, ""); DEFINE_bool(use_latch, false, ""); -namespace { - -template <typename T> -using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>; - -template <typename FunctorT> -inline auto InvokeOnKeyType(const Type &type, - const FunctorT &functor) { - switch (type.getTypeID()) { - case TypeID::kInt: - return functor(static_cast<const IntType&>(type)); - case TypeID::kLong: - return functor(static_cast<const LongType&>(type)); - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename FunctorT> -inline auto InvokeOnType(const Type &type, - const FunctorT &functor) { - switch (type.getTypeID()) { - case TypeID::kInt: - return functor(static_cast<const IntType&>(type)); - case TypeID::kLong: - return functor(static_cast<const LongType&>(type)); - case TypeID::kFloat: - return functor(static_cast<const FloatType&>(type)); - case TypeID::kDouble: - return functor(static_cast<const DoubleType&>(type)); - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename FunctorT> -inline auto InvokeOnBool(const bool &val, - const FunctorT &functor) { - if (val) { - return functor(std::true_type()); - } else { - return functor(std::false_type()); - } -} - -template <typename FunctorT> -inline auto InvokeOnBools(const bool &val1, - const bool &val2, - const FunctorT &functor) { - if (val1) { - if (val2) { - return functor(std::true_type(), std::true_type()); - } else { - return functor(std::true_type(), std::false_type()); - } - } else { - if (val2) { - return functor(std::false_type(), std::true_type()); - } else { - return functor(std::false_type(), std::false_type()); - } - } -} - -template <typename FunctorT> -inline auto InvokeOnAggFunc(const AggregationID &agg_id, - const FunctorT &functor) { - switch (agg_id) { - case AggregationID::kSum: { - return functor(Sum()); - } - default: - LOG(FATAL) << "Not supported"; - } -} - -template <typename FunctorT> -inline auto InvokeIf(const std::true_type &val, - const FunctorT &functor) { - return functor(); -} - -template <typename FunctorT> -inline void InvokeIf(const std::false_type &val, - const FunctorT &functor) { -} - -//template <typename FunctorT> -//inline void InvokeOnAggFuncIfApplicableToArgType( -// const AggregationID &agg_id, -// const Type &arg_type, -// const FunctorT &functor) { -// InvokeOnAggFunc( -// agg_id, -// [&](const auto &agg_func) -> void { -// InvokeOnType( -// arg_type, -// [&](const auto &arg_type) -> void { -// using AggFuncT = std::remove_reference_t<decltype(agg_func)>; -// using ArgT = remove_const_reference_t<decltype(arg_type)>; -// -// InvokeIf( -// typename AggFuncT::template HasAtomicImpl<ArgT>(), -// [&]() -> void { -// functor(agg_func, arg_type); -// }); -// }); -// }); -//} - -template <typename FunctorT> -inline void InvokeOnAggFuncWithArgType( - const AggregationID &agg_id, - const Type &arg_type, - const FunctorT &functor) { - InvokeOnAggFunc( - agg_id, - [&](const auto &agg_func) -> void { - InvokeOnType( - arg_type, - [&](const auto &arg_type) -> void { - functor(agg_func, arg_type); - }); - }); -} - -template <typename FunctorT> -inline auto InvokeOnTwoAccessors( - const ValueAccessorMultiplexer &accessor_mux, - const ValueAccessorSource &first_source, - const ValueAccessorSource &second_source, - const FunctorT &functor) { - ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); - ColumnVectorsValueAccessor *derived_accessor = - static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor()); - - InvokeOnAnyValueAccessor( - base_accessor, - [&](auto *accessor) { - if (first_source == ValueAccessorSource::kBase) { - if (second_source == ValueAccessorSource::kBase) { - return functor(std::false_type(), accessor, accessor); - } else { - return functor(std::true_type(), accessor, derived_accessor); - } - } else { - if (second_source == ValueAccessorSource::kBase) { - return functor(std::true_type(), derived_accessor, accessor); - } else { - return functor(std::false_type(), derived_accessor, derived_accessor); - } - } - }); -} - -} // namespace - CollisionFreeVectorTable::CollisionFreeVectorTable( const Type *key_type, const std::size_t num_entries, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/PackedPayloadHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp index bd7e960..c9d956c 100644 --- a/storage/PackedPayloadHashTable.cpp +++ b/storage/PackedPayloadHashTable.cpp @@ -26,6 +26,8 @@ #include <vector> #include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggFunc.hpp" +#include "storage/AggregationUtil.hpp" #include "storage/HashTableKeyManager.hpp" #include "storage/StorageBlob.hpp" #include "storage/StorageBlockInfo.hpp" @@ -53,28 +55,14 @@ PackedPayloadHashTable::PackedPayloadHashTable( : key_types_(key_types), num_handles_(handles.size()), handles_(handles), - total_payload_size_(ComputeTotalPayloadSize(handles)), + state_sizes_(ComputeStateSizes(handles)), + total_payload_size_(ComputeTotalPayloadSize(state_sizes_)), + state_offsets_(ComputeStateOffsets(state_sizes_)), storage_manager_(storage_manager), kBucketAlignment(alignof(std::atomic<std::size_t>)), kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)), key_manager_(key_types_, kValueOffset + total_payload_size_), bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) { - std::size_t payload_offset_running_sum = sizeof(SpinMutex); - for (const auto *handle : handles) { - payload_offsets_.emplace_back(payload_offset_running_sum); - payload_offset_running_sum += handle->getPayloadSize(); - } - - // NOTE(jianqiao): Potential memory leak / double freeing by copying from - // init_payload to buckets if payload contains out of line data. - init_payload_ = - static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1)); - DCHECK(init_payload_ != nullptr); - - for (std::size_t i = 0; i < num_handles_; ++i) { - handles_[i]->initPayload(init_payload_ + payload_offsets_[i]); - } - // Bucket size always rounds up to the alignment requirement of the atomic // size_t "next" pointer at the front or a ValueT, whichever is larger. // @@ -192,7 +180,6 @@ PackedPayloadHashTable::~PackedPayloadHashTable() { blob_.release(); storage_manager_->deleteBlockOrBlobFile(blob_id); } - std::free(init_payload_); } void PackedPayloadHashTable::clear() { @@ -214,17 +201,6 @@ void PackedPayloadHashTable::clear() { } void PackedPayloadHashTable::destroyPayload() { - const std::size_t num_buckets = - header_->buckets_allocated.load(std::memory_order_relaxed); - void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset; - for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) { - for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) { - void *value_internal_ptr = - static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id]; - handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr)); - } - bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_; - } } bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( @@ -232,24 +208,131 @@ bool PackedPayloadHashTable::upsertValueAccessorCompositeKey( const std::vector<MultiSourceAttributeId> &key_attr_ids, const ValueAccessorMultiplexer &accessor_mux) { ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); - ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor(); - - base_accessor->beginIterationVirtual(); - if (derived_accessor == nullptr) { - return upsertValueAccessorCompositeKeyInternal<false>( - argument_ids, - key_attr_ids, - base_accessor, - nullptr); - } else { - DCHECK(derived_accessor->getImplementationType() - == ValueAccessor::Implementation::kColumnVectors); - derived_accessor->beginIterationVirtual(); - return upsertValueAccessorCompositeKeyInternal<true>( - argument_ids, - key_attr_ids, - base_accessor, - static_cast<ColumnVectorsValueAccessor *>(derived_accessor)); + CHECK(accessor_mux.getDerivedAccessor() == nullptr); + + std::vector<attribute_id> key_ids; + for (const auto &key_attr_id : key_attr_ids) { + key_ids.emplace_back(key_attr_id.attr_id); + } + + for (std::size_t i = 0; i < num_handles_; ++i) { + DCHECK_LE(argument_ids[i].size(), 1u); + + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + const auto &argument_ids_i = argument_ids[i]; + + attribute_id argument_id = kInvalidAttributeID; + const Type *argument_type = nullptr; + + if (argument_ids_i.empty()) { + LOG(FATAL) << "Not supported"; + } else { + DCHECK_EQ(1u, argument_ids_i.size()); + argument_id = argument_ids_i.front().attr_id; + + DCHECK_EQ(1u, argument_types.size()); + argument_type = argument_types.front(); + } + + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *argument_type, + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + InvokeOnAnyValueAccessor( + base_accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + + if (key_ids.size() == 1) { + if (FLAGS_use_latch) { + this->upsertValueAccessorInternalUnaryLatch<AggFuncT, ArgT>( + argument_id, + key_ids.front(), + state_offsets_[i], + accessor); + } else { + this->upsertValueAccessorInternalUnaryAtomic<AggFuncT, ArgT>( + argument_id, + key_ids.front(), + state_offsets_[i], + accessor); + } + } else { + if (FLAGS_use_latch) { + this->upsertValueAccessorCompositeKeyInternalUnaryLatch<AggFuncT, ArgT>( + argument_id, + key_ids, + state_offsets_[i], + accessor); + } else { + this->upsertValueAccessorCompositeKeyInternalUnaryAtomic<AggFuncT, ArgT>( + argument_id, + key_ids, + state_offsets_[i], + accessor); + } + } + }); + }); + } + return true; +} + +void PackedPayloadHashTable::finalize(const std::size_t partition_id, + ColumnVectorsValueAccessor *results) { + const std::size_t start_position = + calculatePartitionStartPosition(partition_id); + const std::size_t end_position = + calculatePartitionEndPosition(partition_id); + + const std::size_t num_entries = end_position - start_position; + + for (std::size_t key_idx = 0; key_idx < key_types_.size(); ++key_idx) { + NativeColumnVector *key_cv = + new NativeColumnVector(*key_types_[key_idx], num_entries); + finalizeKey(start_position, end_position, key_idx, key_cv); + results->addColumn(key_cv); + } + + for (std::size_t i = 0; i < num_handles_; ++i) { + const AggregationHandle *handle = handles_[i]; + const auto &argument_types = handle->getArgumentTypes(); + + const Type *argument_type = nullptr; + if (argument_types.empty()) { + LOG(FATAL) << "Not supported"; + } else { + DCHECK_EQ(1u, argument_types.size()); + argument_type = argument_types.front(); + } + + NativeColumnVector *result_cv = + new NativeColumnVector(*handle->getResultType(), num_entries); + + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *argument_type, + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + if (FLAGS_use_latch) { + this->finalizeStateLatch<AggFuncT, ArgT>(start_position, + end_position, + state_offsets_[i], + result_cv); + } else { + this->finalizeStateAtomic<AggFuncT, ArgT>(start_position, + end_position, + state_offsets_[i], + result_cv); + } + }); + + results->addColumn(result_cv); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1a6435f1/storage/PackedPayloadHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp index 9ba5500..1a56f94 100644 --- a/storage/PackedPayloadHashTable.hpp +++ b/storage/PackedPayloadHashTable.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ #define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_ +#include <algorithm> #include <atomic> #include <cstddef> #include <cstdint> @@ -29,6 +30,8 @@ #include "catalog/CatalogTypedefs.hpp" #include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggFunc.hpp" +#include "storage/AggregationUtil.hpp" #include "storage/HashTableBase.hpp" #include "storage/HashTableKeyManager.hpp" #include "storage/StorageBlob.hpp" @@ -38,14 +41,20 @@ #include "threading/SpinMutex.hpp" #include "threading/SpinSharedMutex.hpp" #include "types/TypedValue.hpp" +#include "types/containers/ColumnVector.hpp" #include "types/containers/ColumnVectorsValueAccessor.hpp" #include "utility/HashPair.hpp" #include "utility/Macros.hpp" +#include "gflags/gflags.h" + #include "glog/logging.h" namespace quickstep { +DECLARE_int32(num_workers); +DECLARE_bool(use_latch); + class StorageManager; class Type; class ValueAccessor; @@ -126,6 +135,9 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { const std::vector<MultiSourceAttributeId> &key_ids, const ValueAccessorMultiplexer &accessor_mux) override; + void finalize(const std::size_t partition_id, + ColumnVectorsValueAccessor *results); + /** * @return The ID of the StorageBlob used to store this hash table. **/ @@ -364,16 +376,55 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { const std::uint8_t **value, std::size_t *entry_num) const; - inline std::uint8_t* upsertCompositeKeyInternal( - const std::vector<TypedValue> &key, - const std::size_t variable_key_size); + inline std::uint8_t* upsertInternal(const TypedValue &key); - template <bool use_two_accessors> - inline bool upsertValueAccessorCompositeKeyInternal( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessor *base_accessor, - ColumnVectorsValueAccessor *derived_accessor); + inline std::uint8_t* upsertCompositeKeyInternal( + const std::vector<TypedValue> &key); + + template <typename AggFuncT, typename ArgT, typename ValueAccessorT> + inline bool upsertValueAccessorInternalUnaryAtomic( + const attribute_id argument_ids, + const attribute_id key_id, + const std::size_t state_offset, + ValueAccessorT *accessor); + + template <typename AggFuncT, typename ArgT, typename ValueAccessorT> + inline bool upsertValueAccessorInternalUnaryLatch( + const attribute_id argument_ids, + const attribute_id key_id, + const std::size_t state_offset, + ValueAccessorT *accessor); + + template <typename AggFuncT, typename ArgT, typename ValueAccessorT> + inline bool upsertValueAccessorCompositeKeyInternalUnaryAtomic( + const attribute_id argument_id, + const std::vector<attribute_id> key_ids, + const std::size_t state_offset, + ValueAccessorT *accessor); + + template <typename AggFuncT, typename ArgT, typename ValueAccessorT> + inline bool upsertValueAccessorCompositeKeyInternalUnaryLatch( + const attribute_id argument_id, + const std::vector<attribute_id> key_ids, + const std::size_t state_offset, + ValueAccessorT *accessor); + + inline void finalizeKey(const std::size_t start_position, + const std::size_t end_position, + const std::size_t key_idx, + NativeColumnVector *output_cv); + + template <typename AggFuncT, typename ArgT> + inline void finalizeStateAtomic(const std::size_t start_position, + const std::size_t end_position, + const std::size_t state_offset, + NativeColumnVector *results); + + template <typename AggFuncT, typename ArgT> + inline void finalizeStateLatch(const std::size_t start_position, + const std::size_t end_position, + const std::size_t state_offset, + NativeColumnVector *results); // Generate a hash for a composite key by hashing each component of 'key' and // mixing their bits with CombineHashes(). @@ -387,33 +438,62 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { key_inline_ = key_inline; } - inline static std::size_t ComputeTotalPayloadSize( + inline static std::vector<std::size_t> ComputeStateSizes( const std::vector<AggregationHandle *> &handles) { - std::size_t total_payload_size = sizeof(SpinMutex); - for (const auto *handle : handles) { - total_payload_size += handle->getPayloadSize(); + std::vector<std::size_t> state_sizes; + for (std::size_t i = 0; i < handles.size(); ++i) { + const AggregationHandle *handle = handles[i]; + InvokeOnAggFuncWithArgType( + handle->getAggregationID(), + *handle->getArgumentTypes().front(), + [&](const auto &agg_func, const auto &arg_type) { + using AggFuncT = std::remove_reference_t<decltype(agg_func)>; + using ArgT = remove_const_reference_t<decltype(arg_type)>; + + if (FLAGS_use_latch) { + state_sizes.emplace_back( + sizeof(typename AggFuncT::template AggState<ArgT>::T)); + } else { + state_sizes.emplace_back( + sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT)); + } + }); + } + return state_sizes; + } + + inline static std::size_t ComputeTotalPayloadSize( + const std::vector<std::size_t> &state_sizes) { + const std::size_t mutex_size = + FLAGS_use_latch ? sizeof(SpinMutex) : 0; + const std::size_t total_state_size = + std::accumulate(state_sizes.begin(), state_sizes.end(), 0); + return mutex_size + total_state_size; + } + + inline static std::vector<std::size_t> ComputeStateOffsets( + const std::vector<std::size_t> &state_sizes) { + std::vector<std::size_t> state_offsets; + std::size_t state_offset = + FLAGS_use_latch ? sizeof(SpinMutex) : 0; + for (const std::size_t state_size : state_sizes) { + state_offsets.emplace_back(state_offset); + state_offset += state_size; } - return total_payload_size; + return state_offsets; } // Assign '*key_vector' with the attribute values specified by 'key_ids' at // the current position of 'accessor'. If 'check_for_null_keys' is true, stops // and returns true if any of the values is null, otherwise returns false. - template <bool use_two_accessors, - bool check_for_null_keys, + template <bool check_for_null_keys, typename ValueAccessorT> inline static bool GetCompositeKeyFromValueAccessor( - const std::vector<MultiSourceAttributeId> &key_ids, + const std::vector<attribute_id> &key_ids, const ValueAccessorT *accessor, - const ColumnVectorsValueAccessor *derived_accessor, std::vector<TypedValue> *key_vector) { for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) { - const MultiSourceAttributeId &key_id = key_ids[key_idx]; - if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) { - (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id); - } else { - (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id); - } + (*key_vector)[key_idx] = accessor->getTypedValue(key_ids[key_idx]); if (check_for_null_keys && (*key_vector)[key_idx].isNull()) { return true; } @@ -441,9 +521,9 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { const std::size_t num_handles_; const std::vector<AggregationHandle *> handles_; - std::size_t total_payload_size_; - std::vector<std::size_t> payload_offsets_; - std::uint8_t *init_payload_; + const std::vector<std::size_t> state_sizes_; + const std::size_t total_payload_size_; + const std::vector<std::size_t> state_offsets_; StorageManager *storage_manager_; MutableBlobReference blob_; @@ -471,8 +551,8 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { // Set finalization segment size as 4096 entries. constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L; - // At least 1 partition, at most 80 partitions. - return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL)); + return std::max(static_cast<std::size_t>(FLAGS_num_workers), + std::min(num_entries / kFinalizeSegmentSize, 80uL)); } // Attempt to find an empty bucket to insert 'hash_code' into, starting after @@ -488,7 +568,6 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase { // deallocated after being allocated. inline bool locateBucketForInsertion( const std::size_t hash_code, - const std::size_t variable_key_allocation_required, void **bucket, std::atomic<std::size_t> **pending_chain_ptr, std::size_t *pending_chain_ptr_finish_value); @@ -636,7 +715,6 @@ inline bool PackedPayloadHashTable::getNextEntryCompositeKey( inline bool PackedPayloadHashTable::locateBucketForInsertion( const std::size_t hash_code, - const std::size_t variable_key_allocation_required, void **bucket, std::atomic<std::size_t> **pending_chain_ptr, std::size_t *pending_chain_ptr_finish_value) { @@ -652,17 +730,6 @@ inline bool PackedPayloadHashTable::locateBucketForInsertion( std::numeric_limits<std::size_t>::max(), std::memory_order_acq_rel)) { // Got to the end of the chain. Allocate a new bucket. - - // First, allocate variable-length key storage, if needed (i.e. if this - // is an upsert and we didn't allocate up-front). - if (!key_manager_.allocateVariableLengthKeyStorage( - variable_key_allocation_required)) { - // Ran out of variable-length storage. - (*pending_chain_ptr)->store(0, std::memory_order_release); - *bucket = nullptr; - return false; - } - const std::size_t allocated_bucket_num = header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed); if (allocated_bucket_num >= header_->num_buckets) { @@ -730,27 +797,7 @@ inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( const std::vector<TypedValue> &key, const std::size_t index) const { - DEBUG_ASSERT(this->key_types_.size() == key.size()); - - const std::size_t hash_code = this->hashCompositeKey(key); - std::size_t bucket_ref = - slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed); - while (bucket_ref != 0) { - DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max()); - const char *bucket = - static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_; - const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>( - bucket + sizeof(std::atomic<std::size_t>)); - if ((bucket_hash == hash_code) && - key_manager_.compositeKeyCollisionCheck(key, bucket)) { - // Match located. - return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) + - this->payload_offsets_[index]; - } - bucket_ref = - reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load( - std::memory_order_relaxed); - } + LOG(FATAL) << "Not implemented"; // Reached the end of the chain and didn't find a match. return nullptr; @@ -759,24 +806,9 @@ inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey( inline bool PackedPayloadHashTable::upsertCompositeKey( const std::vector<TypedValue> &key, const std::uint8_t *source_state) { - const std::size_t variable_size = - calculateVariableLengthCompositeKeyCopySize(key); - for (;;) { - { - SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_); - std::uint8_t *value = - upsertCompositeKeyInternal(key, variable_size); - if (value != nullptr) { - SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value))); - for (unsigned int k = 0; k < num_handles_; ++k) { - handles_[k]->mergeStates(source_state + payload_offsets_[k], - value + payload_offsets_[k]); - } - return true; - } - } - resize(0, variable_size); - } + LOG(FATAL) << "Not implemented"; + + return true; } template <typename FunctorT> @@ -784,48 +816,56 @@ inline bool PackedPayloadHashTable::upsertCompositeKey( const std::vector<TypedValue> &key, FunctorT *functor, const std::size_t index) { - const std::size_t variable_size = - calculateVariableLengthCompositeKeyCopySize(key); - for (;;) { - { - SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_); - std::uint8_t *value = - upsertCompositeKeyInternal(key, variable_size); - if (value != nullptr) { - (*functor)(value + payload_offsets_[index]); - return true; - } - } - resize(0, variable_size); - } -} + LOG(FATAL) << "Not implemented"; + return true; +} -inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( - const std::vector<TypedValue> &key, - const std::size_t variable_key_size) { - if (variable_key_size > 0) { - // Don't allocate yet, since the key may already be present. However, we - // do check if either the allocated variable storage space OR the free - // space is big enough to hold the key (at least one must be true: either - // the key is already present and allocated, or we need to be able to - // allocate enough space for it). - std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load( - std::memory_order_relaxed); - if ((allocated_bytes < variable_key_size) && - (allocated_bytes + variable_key_size > - key_manager_.getVariableLengthKeyStorageSize())) { +inline std::uint8_t* PackedPayloadHashTable::upsertInternal( + const TypedValue &key) { + const std::size_t hash_code = key.getHash(); + void *bucket = nullptr; + std::atomic<std::size_t> *pending_chain_ptr; + std::size_t pending_chain_ptr_finish_value; + for (;;) { + if (locateBucketForInsertion(hash_code, + &bucket, + &pending_chain_ptr, + &pending_chain_ptr_finish_value)) { + // Found an empty bucket. + break; + } else if (bucket == nullptr) { + // Ran out of buckets or variable-key space. return nullptr; + } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) { + // Found an already-existing entry for this key. + return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) + + kValueOffset); } } + // We are now writing to an empty bucket. + // Write the key and hash. + writeScalarKeyToBucket(key, hash_code, bucket); + + std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset; +// std::memcpy(value, init_payload_, this->total_payload_size_); + + // Update the previous chain pointer to point to the new bucket. + pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release); + + // Return the value. + return value; +} + +inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( + const std::vector<TypedValue> &key) { const std::size_t hash_code = this->hashCompositeKey(key); void *bucket = nullptr; std::atomic<std::size_t> *pending_chain_ptr; std::size_t pending_chain_ptr_finish_value; for (;;) { if (locateBucketForInsertion(hash_code, - variable_key_size, &bucket, &pending_chain_ptr, &pending_chain_ptr_finish_value)) { @@ -846,7 +886,8 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( writeCompositeKeyToBucket(key, hash_code, bucket); std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset; - std::memcpy(value, init_payload_, this->total_payload_size_); +// std::memcpy(value, init_payload_, this->total_payload_size_); + std::memset(value, 0, this->total_payload_size_); // Update the previous chaing pointer to point to the new bucket. pending_chain_ptr->store(pending_chain_ptr_finish_value, @@ -856,72 +897,165 @@ inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal( return value; } -template <bool use_two_accessors> -inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_ids, - ValueAccessor *base_accessor, - ColumnVectorsValueAccessor *derived_accessor) { - std::size_t variable_size; +template <typename AggFuncT, typename ArgT, typename ValueAccessorT> +inline bool PackedPayloadHashTable + ::upsertValueAccessorInternalUnaryAtomic( + const attribute_id argument_id, + const attribute_id key_id, + const std::size_t state_offset, + ValueAccessorT *accessor) { + using StateT = typename AggFuncT::template AggState<ArgT>; + + accessor->beginIteration(); + while (accessor->next()) { + TypedValue key = accessor->getTypedValue(key_id); + std::uint8_t *payload = this->upsertInternal(key); + const auto *argument = static_cast<const typename ArgT::cpptype *>( + accessor->template getUntypedValue<false>(argument_id)); + auto *state = + reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset); + + AggFuncT::template MergeArgAtomic<ArgT>(*argument, state); + } + return true; +} + +template <typename AggFuncT, typename ArgT, typename ValueAccessorT> +inline bool PackedPayloadHashTable + ::upsertValueAccessorInternalUnaryLatch( + const attribute_id argument_id, + const attribute_id key_id, + const std::size_t state_offset, + ValueAccessorT *accessor) { + using StateT = typename AggFuncT::template AggState<ArgT>; + + accessor->beginIteration(); + while (accessor->next()) { + TypedValue key = accessor->getTypedValue(key_id); + std::uint8_t *payload = this->upsertInternal(key); + const auto *argument = static_cast<const typename ArgT::cpptype *>( + accessor->template getUntypedValue<false>(argument_id)); + auto *state = + reinterpret_cast<typename StateT::T *>(payload + state_offset); + + SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload))); + AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state); + } + return true; +} + +template <typename AggFuncT, typename ArgT, typename ValueAccessorT> +inline bool PackedPayloadHashTable:: + upsertValueAccessorCompositeKeyInternalUnaryAtomic( + const attribute_id argument_id, + const std::vector<attribute_id> key_ids, + const std::size_t state_offset, + ValueAccessorT *accessor) { + using StateT = typename AggFuncT::template AggState<ArgT>; + std::vector<TypedValue> key_vector; key_vector.resize(key_ids.size()); - return InvokeOnAnyValueAccessor( - base_accessor, - [&](auto *accessor) -> bool { // NOLINT(build/c++11) - bool continuing = true; - while (continuing) { - { - continuing = false; - SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_); - while (accessor->next()) { - if (use_two_accessors) { - derived_accessor->next(); - } - if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>( - key_ids, - accessor, - derived_accessor, - &key_vector)) { - continue; - } - variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector); - std::uint8_t *value = this->upsertCompositeKeyInternal( - key_vector, variable_size); - if (value == nullptr) { - continuing = true; - break; - } else { - SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value))); - for (unsigned int k = 0; k < num_handles_; ++k) { - const auto &ids = argument_ids[k]; - if (ids.empty()) { - handles_[k]->updateStateNullary(value + payload_offsets_[k]); - } else { - const MultiSourceAttributeId &arg_id = ids.front(); - if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) { - DCHECK_NE(arg_id.attr_id, kInvalidAttributeID); - handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id), - value + payload_offsets_[k]); - } else { - handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id), - value + payload_offsets_[k]); - } - } - } - } - } - } - if (continuing) { - this->resize(0, variable_size); - accessor->previous(); - if (use_two_accessors) { - derived_accessor->previous(); - } - } - } - return true; - }); + accessor->beginIteration(); + while (accessor->next()) { + this->GetCompositeKeyFromValueAccessor<false>(key_ids, + accessor, + &key_vector); + std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector); + const auto *argument = static_cast<const typename ArgT::cpptype *>( + accessor->template getUntypedValue<false>(argument_id)); + auto *state = + reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset); + + AggFuncT::template MergeArgAtomic<ArgT>(*argument, state); + } + return true; +} + +template <typename AggFuncT, typename ArgT, typename ValueAccessorT> +inline bool PackedPayloadHashTable:: + upsertValueAccessorCompositeKeyInternalUnaryLatch( + const attribute_id argument_id, + const std::vector<attribute_id> key_ids, + const std::size_t state_offset, + ValueAccessorT *accessor) { + using StateT = typename AggFuncT::template AggState<ArgT>; + + std::vector<TypedValue> key_vector; + key_vector.resize(key_ids.size()); + + accessor->beginIteration(); + while (accessor->next()) { + this->GetCompositeKeyFromValueAccessor<false>(key_ids, + accessor, + &key_vector); + std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector); + const auto *argument = static_cast<const typename ArgT::cpptype *>( + accessor->template getUntypedValue<false>(argument_id)); + auto *state = + reinterpret_cast<typename StateT::T *>(payload + state_offset); + + SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload))); + AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state); + } + return true; +} + +inline void PackedPayloadHashTable + ::finalizeKey(const std::size_t start_position, + const std::size_t end_position, + const std::size_t key_idx, + NativeColumnVector *output_cv) { + for (std::size_t i = start_position; i < end_position; ++i) { + const char *bucket = + static_cast<const char *>(buckets_) + i * bucket_size_; + output_cv->appendTypedValue( + key_manager_.getKeyComponentTyped(bucket, key_idx)); + } +} + +template <typename AggFuncT, typename ArgT> +inline void PackedPayloadHashTable + ::finalizeStateAtomic(const std::size_t start_position, + const std::size_t end_position, + const std::size_t state_offset, + NativeColumnVector *output_cv) { + using StateT = typename AggFuncT::template AggState<ArgT>; + using ResultT = typename StateT::ResultT; + + const std::size_t offset = kValueOffset + state_offset; + for (std::size_t i = start_position; i < end_position; ++i) { + const char *bucket = + static_cast<const char *>(buckets_) + i * bucket_size_; + const auto *state = + reinterpret_cast<const typename StateT::AtomicT *>(bucket + offset); + + AggFuncT::template FinalizeAtomic<ArgT>( + *state, + static_cast<ResultT *>(output_cv->getPtrForDirectWrite())); + } +} + +template <typename AggFuncT, typename ArgT> +inline void PackedPayloadHashTable + ::finalizeStateLatch(const std::size_t start_position, + const std::size_t end_position, + const std::size_t state_offset, + NativeColumnVector *output_cv) { + using StateT = typename AggFuncT::template AggState<ArgT>; + using ResultT = typename StateT::ResultT; + + const std::size_t offset = kValueOffset + state_offset; + for (std::size_t i = start_position; i < end_position; ++i) { + const char *bucket = + static_cast<const char *>(buckets_) + i * bucket_size_; + const auto *state = + reinterpret_cast<const typename StateT::T *>(bucket + offset); + + AggFuncT::template FinalizeUnsafe<ArgT>( + *state, + static_cast<ResultT *>(output_cv->getPtrForDirectWrite())); + } } inline void PackedPayloadHashTable::writeScalarKeyToBucket( @@ -990,7 +1124,7 @@ inline std::size_t PackedPayloadHashTable::forEach( const std::uint8_t *value_ptr; while (getNextEntry(&key, &value_ptr, &entry_num)) { ++entries_visited; - (*functor)(key, value_ptr + payload_offsets_[index]); + (*functor)(key, value_ptr + state_offsets_[index]); key.clear(); } return entries_visited; @@ -1044,7 +1178,7 @@ inline std::size_t PackedPayloadHashTable::forEachCompositeKey( const std::uint8_t *value_ptr; while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) { ++entries_visited; - (*functor)(key, value_ptr + payload_offsets_[index]); + (*functor)(key, value_ptr + state_offsets_[index]); key.clear(); } return entries_visited;