Repository: incubator-quickstep Updated Branches: refs/heads/common-subexpression c27f5beb8 -> 675dcf8de
Updates Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/675dcf8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/675dcf8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/675dcf8d Branch: refs/heads/common-subexpression Commit: 675dcf8de09a341f6b1a9a0cc0622d27bc8a1ff3 Parents: c27f5be Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Wed Apr 19 15:11:24 2017 -0500 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Wed Apr 19 15:11:24 2017 -0500 ---------------------------------------------------------------------- query_optimizer/ExecutionGenerator.cpp | 2 +- query_optimizer/PhysicalGenerator.cpp | 2 +- .../rules/ReuseAggregateExpressions.cpp | 50 +- .../rules/ReuseAggregateExpressions.hpp | 5 +- storage/AggregationOperationState.cpp | 16 +- storage/AggregationOperationState.hpp | 2 +- storage/CMakeLists.txt | 14 +- storage/HashTable.proto | 2 +- storage/HashTableBase.hpp | 2 +- storage/HashTableFactory.hpp | 10 +- storage/ThreadPrivateCompactKeyHashTable.cpp | 35 ++ storage/ThreadPrivateCompactKeyHashTable.hpp | 483 +++++++++++++++++++ storage/ThreadPrivateNumericHashTable.cpp | 0 storage/ThreadPrivateNumericHashTable.hpp | 483 ------------------- 14 files changed, 588 insertions(+), 518 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 0304e2e..8f44cc1 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1590,7 +1590,7 @@ void ExecutionGenerator::convertAggregate( physical_plan, estimated_num_groups)) { std::cout << "Use two phase numeric\n"; aggr_state_proto->set_hash_table_impl_type( - serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC); + serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY); } else { // Otherwise, use SeparateChaining. std::cout << "Use normal\n"; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/PhysicalGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp index 427c365..07bd024 100644 --- a/query_optimizer/PhysicalGenerator.cpp +++ b/query_optimizer/PhysicalGenerator.cpp @@ -149,7 +149,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() { rules.emplace_back(new ReorderColumns()); } - rules.emplace_back(new ReuseAggregateExpressions()); + rules.emplace_back(new ReuseAggregateExpressions(optimizer_context_)); rules.emplace_back(new FuseAggregateJoin()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/rules/ReuseAggregateExpressions.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp b/query_optimizer/rules/ReuseAggregateExpressions.cpp index 77dfd1e..3036833 100644 --- a/query_optimizer/rules/ReuseAggregateExpressions.cpp +++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp @@ -26,7 +26,9 @@ #include <vector> #include "expressions/aggregation/AggregateFunction.hpp" +#include "expressions/aggregation/AggregateFunctionFactory.hpp" #include "expressions/aggregation/AggregationID.hpp" +#include "query_optimizer/OptimizerContext.hpp" #include "query_optimizer/expressions/AggregateFunction.hpp" #include "query_optimizer/expressions/Alias.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" @@ -81,13 +83,19 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode( for (std::size_t i = 0; i < agg_exprs.size(); ++i) { DCHECK(agg_exprs[i]->expression()->getExpressionType() == E::ExpressionType::kAggregateFunction); - const E::AggregateFunctionPtr agg_func = + const E::AggregateFunctionPtr agg_expr = std::static_pointer_cast<const E::AggregateFunction>( agg_exprs[i]->expression()); - const AggregationID agg_id = agg_func->getAggregate().getAggregationID(); + + // Skip DISTINCT aggregations. + if (agg_expr->is_distinct()) { + continue; + } + + const AggregationID agg_id = agg_expr->getAggregate().getAggregationID(); + const std::vector<E::ScalarPtr> &arguments = agg_expr->getArguments(); // Currently we only consider aggregate functions with 0 or 1 argument. - const std::vector<E::ScalarPtr> &arguments = agg_func->getArguments(); if (agg_id == AggregationID::kCount) { if (arguments.empty()) { count_star_info.emplace_front(i); @@ -124,11 +132,10 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode( // First, check whether AVG can be reduced to SUM/COUNT. bool is_avg_processed = false; - const auto sum_it = ref_map.find(AggregationID::kSum); const auto avg_it = ref_map.find(AggregationID::kAvg); - - if (avg_it != ref_map.end() && sum_it != ref_map.end()) { + if (avg_it != ref_map.end()) { std::size_t count_ref = kInvalidRef; + if (it.first->getValueType().isNullable()) { const auto count_it = ref_map.find(AggregationID::kCount); if (count_it != ref_map.end()) { @@ -140,8 +147,10 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode( } if (count_ref != kInvalidRef) { - DCHECK(!sum_it->second.empty()); - const std::size_t sum_ref = sum_it->second.front(); + const auto sum_it = ref_map.find(AggregationID::kSum); + const std::size_t sum_ref = + sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front(); + for (const std::size_t idx : avg_it->second) { agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref)); } @@ -205,11 +214,34 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode( break; } case AggregateReference::kAvg: { + E::AttributeReferencePtr sum_attr; + if (agg_ref->first_ref == kInvalidRef) { + const E::AggregateFunctionPtr avg_expr = + std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression()); + + const AggregateFunction &sum_func = + AggregateFunctionFactory::Get(AggregationID::kSum); + const E::AggregateFunctionPtr sum_expr = + E::AggregateFunction::Create(sum_func, + avg_expr->getArguments(), + avg_expr->is_vector_aggregate(), + avg_expr->is_distinct()); + new_agg_exprs.emplace_back( + E::Alias::Create(optimizer_context_->nextExprId(), + sum_expr, + agg_expr->attribute_name(), + agg_expr->attribute_alias())); + + sum_attr = E::ToRef(new_agg_exprs.back()); + } else { + sum_attr = agg_attrs[agg_ref->first_ref]; + } + const BinaryOperation ÷_op = BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide); const E::BinaryExpressionPtr avg_expr = E::BinaryExpression::Create(divide_op, - agg_attrs[agg_ref->first_ref], + sum_attr, agg_attrs[agg_ref->second_ref]); new_select_exprs.emplace_back( E::Alias::Create(agg_expr->id(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/query_optimizer/rules/ReuseAggregateExpressions.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp b/query_optimizer/rules/ReuseAggregateExpressions.hpp index 0b18023..5b78e46 100644 --- a/query_optimizer/rules/ReuseAggregateExpressions.hpp +++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp @@ -42,7 +42,8 @@ class OptimizerContext; class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> { public: - ReuseAggregateExpressions() {} + ReuseAggregateExpressions(OptimizerContext *optimizer_context) + : optimizer_context_(optimizer_context) {} std::string getName() const override { return "ReuseAggregateExpressions"; @@ -84,6 +85,8 @@ class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> { const std::size_t second_ref; }; + OptimizerContext *optimizer_context_; + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_; DISALLOW_COPY_AND_ASSIGN(ReuseAggregateExpressions); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/AggregationOperationState.cpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp index d6f21bc..6337b5e 100644 --- a/storage/AggregationOperationState.cpp +++ b/storage/AggregationOperationState.cpp @@ -49,7 +49,7 @@ #include "storage/StorageBlockInfo.hpp" #include "storage/StorageManager.hpp" #include "storage/SubBlocksReference.hpp" -#include "storage/ThreadPrivateNumericHashTable.hpp" +#include "storage/ThreadPrivateCompactKeyHashTable.hpp" #include "storage/TupleIdSequence.hpp" #include "storage/TupleStorageSubBlock.hpp" #include "storage/ValueAccessor.hpp" @@ -99,7 +99,7 @@ AggregationOperationState::AggregationOperationState( case HashTableImplType::kCollisionFreeVector: is_aggregate_collision_free_ = true; break; - case HashTableImplType::kThreadPrivateNumeric: + case HashTableImplType::kThreadPrivateCompactKey: break; default: is_aggregate_partitioned_ = checkAggregatePartitioned( @@ -725,8 +725,8 @@ void AggregationOperationState::finalizeHashTable( case HashTableImplType::kSeparateChaining: finalizeHashTableImplThreadPrivatePackedPayload(output_destination); break; - case HashTableImplType::kThreadPrivateNumeric: - finalizeHashTableImplThreadPrivateNumeric(output_destination); + case HashTableImplType::kThreadPrivateCompactKey: + finalizeHashTableImplThreadPrivateCompactKey(output_destination); break; default: LOG(FATAL) << "Not supported"; @@ -963,7 +963,7 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload( output_destination->bulkInsertTuples(&complete_result); } -void AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric( +void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey( InsertDestination *output_destination) { auto *hash_tables = group_by_hashtable_pool_->getAllHashTables(); DCHECK(hash_tables != nullptr); @@ -971,13 +971,13 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric( return; } - std::unique_ptr<ThreadPrivateNumericHashTable> final_hash_table( - static_cast<ThreadPrivateNumericHashTable*>(hash_tables->back().release())); + std::unique_ptr<ThreadPrivateCompactKeyHashTable> final_hash_table( + static_cast<ThreadPrivateCompactKeyHashTable*>(hash_tables->back().release())); for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) { std::unique_ptr<AggregationStateHashTableBase> hash_table( hash_tables->at(i).release()); final_hash_table->merge( - static_cast<const ThreadPrivateNumericHashTable*>(hash_table.get())); + static_cast<const ThreadPrivateCompactKeyHashTable*>(hash_table.get())); hash_table->destroyPayload(); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/AggregationOperationState.hpp ---------------------------------------------------------------------- diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp index e666a68..207c4f0 100644 --- a/storage/AggregationOperationState.hpp +++ b/storage/AggregationOperationState.hpp @@ -259,7 +259,7 @@ class AggregationOperationState { void finalizeHashTableImplThreadPrivatePackedPayload( InsertDestination *output_destination); - void finalizeHashTableImplThreadPrivateNumeric( + void finalizeHashTableImplThreadPrivateCompactKey( InsertDestination *output_destination); std::size_t getMemoryConsumptionBytesHelper( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index b971240..0f610eb 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -250,9 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp) add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp) add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp) -add_library(quickstep_storage_ThreadPrivateNumericHashTable - ThreadPrivateNumericHashTable.cpp - ThreadPrivateNumericHashTable.hpp) +add_library(quickstep_storage_ThreadPrivateCompactKeyHashTable + ThreadPrivateCompactKeyHashTable.cpp + ThreadPrivateCompactKeyHashTable.hpp) add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp) add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp) add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp) @@ -292,7 +292,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager quickstep_storage_SubBlocksReference - quickstep_storage_ThreadPrivateNumericHashTable + quickstep_storage_ThreadPrivateCompactKeyHashTable quickstep_storage_TupleIdSequence quickstep_storage_TupleStorageSubBlock quickstep_storage_ValueAccessor @@ -728,7 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory quickstep_storage_PackedPayloadHashTable quickstep_storage_SeparateChainingHashTable quickstep_storage_SimpleScalarSeparateChainingHashTable - quickstep_storage_ThreadPrivateNumericHashTable + quickstep_storage_ThreadPrivateCompactKeyHashTable quickstep_storage_TupleReference quickstep_types_Type quickstep_types_TypeFactory @@ -1044,7 +1044,7 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry target_link_libraries(quickstep_storage_SubBlocksReference glog quickstep_utility_PtrVector) -target_link_libraries(quickstep_storage_ThreadPrivateNumericHashTable +target_link_libraries(quickstep_storage_ThreadPrivateCompactKeyHashTable glog quickstep_catalog_CatalogTypedefs quickstep_expressions_aggregation_AggregationHandle @@ -1183,7 +1183,7 @@ target_link_libraries(quickstep_storage quickstep_storage_SubBlockTypeRegistry quickstep_storage_SubBlockTypeRegistryMacros quickstep_storage_SubBlocksReference - quickstep_storage_ThreadPrivateNumericHashTable + quickstep_storage_ThreadPrivateCompactKeyHashTable quickstep_storage_TupleIdSequence quickstep_storage_TupleReference quickstep_storage_TupleStorageSubBlock http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTable.proto ---------------------------------------------------------------------- diff --git a/storage/HashTable.proto b/storage/HashTable.proto index 80e363c..ed383df 100644 --- a/storage/HashTable.proto +++ b/storage/HashTable.proto @@ -26,7 +26,7 @@ enum HashTableImplType { LINEAR_OPEN_ADDRESSING = 1; SEPARATE_CHAINING = 2; SIMPLE_SCALAR_SEPARATE_CHAINING = 3; - THREAD_PRIVATE_NUMERIC = 4; + THREAD_PRIVATE_COMPACT_KEY = 4; } // NOTE(chasseur): This proto describes the run-time parameters for a resizable http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTableBase.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp index c3cbddf..6982a3f 100644 --- a/storage/HashTableBase.hpp +++ b/storage/HashTableBase.hpp @@ -45,7 +45,7 @@ enum class HashTableImplType { kLinearOpenAddressing, kSeparateChaining, kSimpleScalarSeparateChaining, - kThreadPrivateNumeric + kThreadPrivateCompactKey }; /** http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/HashTableFactory.hpp ---------------------------------------------------------------------- diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp index 52f4d5f..cb1f16f 100644 --- a/storage/HashTableFactory.hpp +++ b/storage/HashTableFactory.hpp @@ -32,7 +32,7 @@ #include "storage/PackedPayloadHashTable.hpp" #include "storage/SeparateChainingHashTable.hpp" #include "storage/SimpleScalarSeparateChainingHashTable.hpp" -#include "storage/ThreadPrivateNumericHashTable.hpp" +#include "storage/ThreadPrivateCompactKeyHashTable.hpp" #include "storage/TupleReference.hpp" #include "types/TypeFactory.hpp" #include "utility/BloomFilter.hpp" @@ -124,8 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto( return HashTableImplType::kSeparateChaining; case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING: return HashTableImplType::kSimpleScalarSeparateChaining; - case serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC: - return HashTableImplType::kThreadPrivateNumeric; + case serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY: + return HashTableImplType::kThreadPrivateCompactKey; default: { LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n"; } @@ -372,8 +372,8 @@ class AggregationStateHashTableFactory { case HashTableImplType::kSeparateChaining: return new PackedPayloadHashTable( key_types, num_entries, handles, storage_manager); - case HashTableImplType::kThreadPrivateNumeric: - return new ThreadPrivateNumericHashTable( + case HashTableImplType::kThreadPrivateCompactKey: + return new ThreadPrivateCompactKeyHashTable( key_types, num_entries, handles, storage_manager); default: { LOG(FATAL) << "Unrecognized HashTableImplType in " http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateCompactKeyHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/ThreadPrivateCompactKeyHashTable.cpp b/storage/ThreadPrivateCompactKeyHashTable.cpp new file mode 100644 index 0000000..226188b --- /dev/null +++ b/storage/ThreadPrivateCompactKeyHashTable.cpp @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "storage/ThreadPrivateCompactKeyHashTable.hpp" + +#include <cstddef> +#include <cstdint> + +#include "expressions/aggregation/AggregationID.hpp" +#include "types/TypeID.hpp" + +#include "glog/logging.h" + +namespace quickstep { + + + + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateCompactKeyHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/ThreadPrivateCompactKeyHashTable.hpp b/storage/ThreadPrivateCompactKeyHashTable.hpp new file mode 100644 index 0000000..a22fdab --- /dev/null +++ b/storage/ThreadPrivateCompactKeyHashTable.hpp @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_ +#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_ + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <unordered_map> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregationHandle.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "storage/HashTableBase.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/Type.hpp" +#include "types/TypeID.hpp" +#include "types/containers/ColumnVector.hpp" +#include "types/containers/ColumnVectorsValueAccessor.hpp" +#include "utility/Macros.hpp" +#include "utility/ScopedBuffer.hpp" + +#include "glog/logging.h" + +namespace quickstep { + +class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase { + public: + ThreadPrivateCompactKeyHashTable( + const std::vector<const Type *> &key_types, + const std::size_t num_entries, + const std::vector<AggregationHandle *> &handles, + StorageManager *storage_manager) + : key_types_(key_types), + handles_(handles), + bucket_size_(0), + num_buckets_(num_entries), + buckets_allocated_(0) { + for (const Type *key_type : key_types) { + DCHECK(!key_type->isVariableLength()); + + const std::size_t key_size = key_type->maximumByteLength(); + DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 8u); + + key_sizes_.emplace_back(key_size); + } + + for (const AggregationHandle *handle : handles) { + state_offsets_.emplace_back(bucket_size_); + + const std::vector<const Type*> arg_types = handle->getArgumentTypes(); + DCHECK_LE(arg_types.size(), 1u); + + std::size_t state_size = 0; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + state_size = sizeof(std::int64_t); + break; + } + case AggregationID::kSum: { + DCHECK_EQ(1u, arg_types.size()); + switch (arg_types.front()->getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + state_size = sizeof(std::int64_t); + break; + case TypeID::kFloat: // Fall through + case TypeID::kDouble: + state_size = sizeof(double); + break; + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + bucket_size_ += state_size; + } + + keys_.reset(sizeof(std::uint64_t) * num_buckets_); + buckets_.reset(bucket_size_ * num_buckets_); + } + + ~ThreadPrivateCompactKeyHashTable() override {} + + HashTableImplType getImplType() const override { + return HashTableImplType::kThreadPrivateCompactKey; + } + + void destroyPayload() override {} + + std::size_t getMemoryConsumptionBytes() const override { + return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t)); + } + + inline std::size_t numEntries() const { + return buckets_allocated_; + } + + bool upsertValueAccessorCompositeKey( + const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, + const std::vector<MultiSourceAttributeId> &key_attr_ids, + const ValueAccessorMultiplexer &accessor_mux) override { + ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); + ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor(); + + DCHECK(base_accessor != nullptr); + const std::size_t num_tuples = base_accessor->getNumTuplesVirtual(); + + ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples); + std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get()); + std::size_t key_code_offset = 0; + for (std::size_t i = 0; i < key_attr_ids.size(); ++i) { + const auto &key_attr_id = key_attr_ids[i]; + ValueAccessor *accessor = + key_attr_id.source == ValueAccessorSource::kBase + ? base_accessor + : derived_accessor; + DCHECK(accessor != nullptr); + + const std::size_t key_size = key_sizes_[i]; + switch (key_size) { + case 1u: + ConstructKeyCode<std::uint8_t>( + key_code_offset, key_attr_id.attr_id, accessor, key_codes); + break; + case 2u: + ConstructKeyCode<std::uint16_t>( + key_code_offset, key_attr_id.attr_id, accessor, key_codes); + break; + case 4u: + ConstructKeyCode<std::uint32_t>( + key_code_offset, key_attr_id.attr_id, accessor, key_codes); + break; + case 8u: + ConstructKeyCode<std::uint64_t>( + key_code_offset, key_attr_id.attr_id, accessor, key_codes); + break; + default: + LOG(FATAL) << "Not implemented"; + } + + key_code_offset += key_size; + } + + std::vector<BucketIndex> bucket_indices; + bucket_indices.reserve(num_tuples); + std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get()); + for (std::size_t i = 0; i < num_tuples; ++i) { + const std::size_t code = key_codes[i]; + const auto index_it = index_.find(code); + if (index_it == index_.end()) { + // TODO: Resize if overflow + index_.emplace(code, buckets_allocated_); + bucket_indices.emplace_back(buckets_allocated_); + keys[buckets_allocated_] = code; + ++buckets_allocated_; + } else { + bucket_indices.emplace_back(index_it->second); + } + } + + // Dispatch + for (std::size_t i = 0; i < handles_.size(); ++i) { + const AggregationHandle *handle = handles_[i]; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + upsertValueAccessorCount(bucket_indices, state_offsets_[i]); + break; + } + case AggregationID::kSum: { + DCHECK_EQ(1u, argument_ids[i].size()); + const auto &argument_id = argument_ids[i].front(); + ValueAccessor *accessor = + argument_id.source == ValueAccessorSource::kBase + ? base_accessor + : derived_accessor; + DCHECK(accessor != nullptr); + + DCHECK_EQ(1u, handle->getArgumentTypes().size()); + const Type *argument_type = handle->getArgumentTypes().front(); + switch (argument_type->getTypeID()) { + case kInt: { + upsertValueAccessorSum<int, std::int64_t>( + bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); + break; + } + case kLong: { + upsertValueAccessorSum<std::int64_t, std::int64_t>( + bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); + break; + } + case kFloat: { + upsertValueAccessorSum<float, double>( + bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); + break; + } + case kDouble: { + upsertValueAccessorSum<double, double>( + bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + } + + return true; + } + + void merge(const ThreadPrivateCompactKeyHashTable *other) { + std::vector<BucketIndex> dst_bucket_indices; + std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get()); + + const char *src_buckets_start = + static_cast<const char*>(other->buckets_.get()); + const std::uint64_t *src_keys = + static_cast<const std::uint64_t*>(other->keys_.get()); + + for (std::size_t i = 0; i < other->buckets_allocated_; ++i) { + const std::uint64_t code = src_keys[i]; + const auto index_it = index_.find(code); + + if (index_it == index_.end()) { + // TODO: Resize if overflow + index_.emplace(code, buckets_allocated_); + dst_bucket_indices.emplace_back(buckets_allocated_); + dst_keys[buckets_allocated_] = code; + ++buckets_allocated_; + } else { + dst_bucket_indices.emplace_back(index_it->second); + } + } + + // Dispatch + for (std::size_t i = 0; i < handles_.size(); ++i) { + const AggregationHandle *handle = handles_[i]; + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + mergeStateSum<std::int64_t>( + dst_bucket_indices, src_buckets_start, state_offsets_[i]); + break; + } + case AggregationID::kSum: { + const Type *argument_type = handle->getArgumentTypes().front(); + switch (argument_type->getTypeID()) { + case kInt: // Fall through + case kLong: { + mergeStateSum<std::int64_t>( + dst_bucket_indices, src_buckets_start, state_offsets_[i]); + break; + } + case kFloat: // Fall through + case kDouble: { + mergeStateSum<double>( + dst_bucket_indices, src_buckets_start, state_offsets_[i]); + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + } + } + + void print() const { + std::cout << "num_entries = " << buckets_allocated_ << "\n"; + const double *values = static_cast<const double*>(buckets_.get()); + for (std::size_t i = 0; i < buckets_allocated_; ++i) { + std::cout << values[i] << "\n"; + } + } + + void finalize(ColumnVectorsValueAccessor *output) const { + std::size_t key_offset = 0; + for (std::size_t i = 0; i < key_types_.size(); ++i) { + const Type &key_type = *key_types_[i]; + std::unique_ptr<NativeColumnVector> native_cv( + new NativeColumnVector(key_type, buckets_allocated_)); + + const std::size_t key_size = key_sizes_[i]; + switch (key_size) { + case 1u: + finalizeKey<std::uint8_t>(key_offset, native_cv.get()); + break; + case 2u: + finalizeKey<std::uint16_t>(key_offset, native_cv.get()); + break; + case 4u: + finalizeKey<std::uint32_t>(key_offset, native_cv.get()); + break; + case 8u: + finalizeKey<std::uint64_t>(key_offset, native_cv.get()); + break; + default: + LOG(FATAL) << "Not implemented"; + } + output->addColumn(native_cv.release()); + + key_offset += key_size; + } + + // Dispatch + for (std::size_t i = 0; i < handles_.size(); ++i) { + const AggregationHandle *handle = handles_[i]; + const Type &result_type = *handle->getResultType(); + std::unique_ptr<NativeColumnVector> native_cv( + new NativeColumnVector(result_type, buckets_allocated_)); + + switch (handle->getAggregationID()) { + case AggregationID::kCount: { + finalizeStateSum<std::int64_t, std::int64_t>( + state_offsets_[i], native_cv.get()); + break; + } + case AggregationID::kSum: { + const Type *argument_type = handle->getArgumentTypes().front(); + switch (argument_type->getTypeID()) { + case kInt: // Fall through + case kLong: { + finalizeStateSum<std::int64_t, std::int64_t>( + state_offsets_[i], native_cv.get()); + break; + } + case kFloat: // Fall through + case kDouble: { + finalizeStateSum<double, double>( + state_offsets_[i], native_cv.get()); + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + break; + } + default: + LOG(FATAL) << "Not implemented"; + } + output->addColumn(native_cv.release()); + } + } + + private: + using BucketIndex = std::uint32_t; + + template <typename KeyT> + inline static void ConstructKeyCode(const std::size_t offset, + const attribute_id attr_id, + ValueAccessor *accessor, + void *key_code_start) { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + char *key_code_ptr = static_cast<char*>(key_code_start) + offset; + accessor->beginIteration(); + while (accessor->next()) { + *reinterpret_cast<KeyT*>(key_code_ptr) = + *static_cast<const KeyT*>( + accessor->template getUntypedValue<false>(attr_id)); + key_code_ptr += sizeof(std::uint64_t); + } + }); + } + + inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices, + const std::size_t state_offset) { + char *state_start = static_cast<char*>(buckets_.get()) + state_offset; + for (const BucketIndex idx : bucket_indices) { + char *state_ptr = state_start + bucket_size_ * idx; + *reinterpret_cast<std::int64_t*>(state_ptr) += 1; + } + } + + template <typename ArgumentT, typename StateT> + inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices, + const std::size_t state_offset, + const attribute_id attr_id, + ValueAccessor *accessor) { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + accessor->beginIteration(); + + char *state_start = static_cast<char*>(buckets_.get()) + state_offset; + std::size_t loc = 0; + while (accessor->next()) { + char *state_ptr = state_start + bucket_size_ * bucket_indices[loc]; + *reinterpret_cast<StateT*>(state_ptr) += + *static_cast<const ArgumentT*>( + accessor->template getUntypedValue<false>(attr_id)); + ++loc; + } + }); + } + + template <typename StateT> + inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices, + const void *src_buckets_start, + const std::size_t state_offset) { + char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset; + const char* src_state_start = + static_cast<const char*>(src_buckets_start) + state_offset; + for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) { + char *dst_state_ptr = dst_state_start + bucket_size_ * dst_bucket_indices[i]; + const char *src_state_ptr = src_state_start + bucket_size_ * i; + *reinterpret_cast<StateT*>(dst_state_ptr) += + *reinterpret_cast<const StateT*>(src_state_ptr); + } + } + + template <typename KeyT> + inline void finalizeKey(const std::size_t offset, + NativeColumnVector *output_cv) const { + const char *key_ptr = static_cast<const char*>(keys_.get()) + offset; + for (std::size_t i = 0; i < buckets_allocated_; ++i) { + *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) = + *reinterpret_cast<const KeyT*>(key_ptr); + key_ptr += sizeof(std::uint64_t); + } + } + + template <typename StateT, typename ResultT> + inline void finalizeStateSum(const std::size_t state_offset, + NativeColumnVector *output_cv) const { + char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset; + for (std::size_t i = 0; i < buckets_allocated_; ++i) { + *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) = + *reinterpret_cast<const StateT*>(state_ptr); + state_ptr += bucket_size_; + } + } + + const std::vector<const Type*> key_types_; + const std::vector<AggregationHandle *> handles_; + + std::vector<std::size_t> key_sizes_; + std::vector<std::size_t> state_offsets_; + std::size_t bucket_size_; + + std::unordered_map<std::uint64_t, BucketIndex> index_; + + std::size_t num_buckets_; + std::size_t buckets_allocated_; + + ScopedBuffer keys_; + ScopedBuffer buckets_; + + DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable); +}; + +} // namespace quickstep + +#endif // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateNumericHashTable.cpp ---------------------------------------------------------------------- diff --git a/storage/ThreadPrivateNumericHashTable.cpp b/storage/ThreadPrivateNumericHashTable.cpp deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/675dcf8d/storage/ThreadPrivateNumericHashTable.hpp ---------------------------------------------------------------------- diff --git a/storage/ThreadPrivateNumericHashTable.hpp b/storage/ThreadPrivateNumericHashTable.hpp deleted file mode 100644 index 2991900..0000000 --- a/storage/ThreadPrivateNumericHashTable.hpp +++ /dev/null @@ -1,483 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_ -#define QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_ - -#include <algorithm> -#include <cstddef> -#include <cstdint> -#include <unordered_map> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/aggregation/AggregationHandle.hpp" -#include "expressions/aggregation/AggregationID.hpp" -#include "storage/HashTableBase.hpp" -#include "storage/ValueAccessorMultiplexer.hpp" -#include "storage/ValueAccessorUtil.hpp" -#include "types/Type.hpp" -#include "types/TypeID.hpp" -#include "types/containers/ColumnVector.hpp" -#include "types/containers/ColumnVectorsValueAccessor.hpp" -#include "utility/Macros.hpp" -#include "utility/ScopedBuffer.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -class ThreadPrivateNumericHashTable : public AggregationStateHashTableBase { - public: - ThreadPrivateNumericHashTable( - const std::vector<const Type *> &key_types, - const std::size_t num_entries, - const std::vector<AggregationHandle *> &handles, - StorageManager *storage_manager) - : key_types_(key_types), - handles_(handles), - bucket_size_(0), - num_buckets_(num_entries), - buckets_allocated_(0) { - for (const Type *key_type : key_types) { - DCHECK(!key_type->isVariableLength()); - - const std::size_t key_size = key_type->maximumByteLength(); - DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 8u); - - key_sizes_.emplace_back(key_size); - } - - for (const AggregationHandle *handle : handles) { - state_offsets_.emplace_back(bucket_size_); - - const std::vector<const Type*> arg_types = handle->getArgumentTypes(); - DCHECK_LE(arg_types.size(), 1u); - - std::size_t state_size = 0; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - state_size = sizeof(std::int64_t); - break; - } - case AggregationID::kSum: { - DCHECK_EQ(1u, arg_types.size()); - switch (arg_types.front()->getTypeID()) { - case TypeID::kInt: // Fall through - case TypeID::kLong: - state_size = sizeof(std::int64_t); - break; - case TypeID::kFloat: // Fall through - case TypeID::kDouble: - state_size = sizeof(double); - break; - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - bucket_size_ += state_size; - } - - keys_.reset(sizeof(std::uint64_t) * num_buckets_); - buckets_.reset(bucket_size_ * num_buckets_); - } - - ~ThreadPrivateNumericHashTable() override {} - - HashTableImplType getImplType() const override { - return HashTableImplType::kThreadPrivateNumeric; - } - - void destroyPayload() override {} - - std::size_t getMemoryConsumptionBytes() const override { - return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t)); - } - - inline std::size_t numEntries() const { - return buckets_allocated_; - } - - bool upsertValueAccessorCompositeKey( - const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids, - const std::vector<MultiSourceAttributeId> &key_attr_ids, - const ValueAccessorMultiplexer &accessor_mux) override { - ValueAccessor *base_accessor = accessor_mux.getBaseAccessor(); - ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor(); - - DCHECK(base_accessor != nullptr); - const std::size_t num_tuples = base_accessor->getNumTuplesVirtual(); - - ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples); - std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get()); - std::size_t key_code_offset = 0; - for (std::size_t i = 0; i < key_attr_ids.size(); ++i) { - const auto &key_attr_id = key_attr_ids[i]; - ValueAccessor *accessor = - key_attr_id.source == ValueAccessorSource::kBase - ? base_accessor - : derived_accessor; - DCHECK(accessor != nullptr); - - const std::size_t key_size = key_sizes_[i]; - switch (key_size) { - case 1u: - ConstructKeyCode<std::uint8_t>( - key_code_offset, key_attr_id.attr_id, accessor, key_codes); - break; - case 2u: - ConstructKeyCode<std::uint16_t>( - key_code_offset, key_attr_id.attr_id, accessor, key_codes); - break; - case 4u: - ConstructKeyCode<std::uint32_t>( - key_code_offset, key_attr_id.attr_id, accessor, key_codes); - break; - case 8u: - ConstructKeyCode<std::uint64_t>( - key_code_offset, key_attr_id.attr_id, accessor, key_codes); - break; - default: - LOG(FATAL) << "Not implemented"; - } - - key_code_offset += key_size; - } - - std::vector<BucketIndex> bucket_indices; - bucket_indices.reserve(num_tuples); - std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get()); - for (std::size_t i = 0; i < num_tuples; ++i) { - const std::size_t code = key_codes[i]; - const auto index_it = index_.find(code); - if (index_it == index_.end()) { - // TODO: Resize if overflow - index_.emplace(code, buckets_allocated_); - bucket_indices.emplace_back(buckets_allocated_); - keys[buckets_allocated_] = code; - ++buckets_allocated_; - } else { - bucket_indices.emplace_back(index_it->second); - } - } - - // Dispatch - for (std::size_t i = 0; i < handles_.size(); ++i) { - const AggregationHandle *handle = handles_[i]; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - upsertValueAccessorCount(bucket_indices, state_offsets_[i]); - break; - } - case AggregationID::kSum: { - DCHECK_EQ(1u, argument_ids[i].size()); - const auto &argument_id = argument_ids[i].front(); - ValueAccessor *accessor = - argument_id.source == ValueAccessorSource::kBase - ? base_accessor - : derived_accessor; - DCHECK(accessor != nullptr); - - DCHECK_EQ(1u, handle->getArgumentTypes().size()); - const Type *argument_type = handle->getArgumentTypes().front(); - switch (argument_type->getTypeID()) { - case kInt: { - upsertValueAccessorSum<int, std::int64_t>( - bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); - break; - } - case kLong: { - upsertValueAccessorSum<std::int64_t, std::int64_t>( - bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); - break; - } - case kFloat: { - upsertValueAccessorSum<float, double>( - bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); - break; - } - case kDouble: { - upsertValueAccessorSum<double, double>( - bucket_indices, state_offsets_[i], argument_id.attr_id, accessor); - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - } - - return true; - } - - void merge(const ThreadPrivateNumericHashTable *other) { - std::vector<BucketIndex> dst_bucket_indices; - std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get()); - - const char *src_buckets_start = - static_cast<const char*>(other->buckets_.get()); - const std::uint64_t *src_keys = - static_cast<const std::uint64_t*>(other->keys_.get()); - - for (std::size_t i = 0; i < other->buckets_allocated_; ++i) { - const std::uint64_t code = src_keys[i]; - const auto index_it = index_.find(code); - - if (index_it == index_.end()) { - // TODO: Resize if overflow - index_.emplace(code, buckets_allocated_); - dst_bucket_indices.emplace_back(buckets_allocated_); - dst_keys[buckets_allocated_] = code; - ++buckets_allocated_; - } else { - dst_bucket_indices.emplace_back(index_it->second); - } - } - - // Dispatch - for (std::size_t i = 0; i < handles_.size(); ++i) { - const AggregationHandle *handle = handles_[i]; - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - mergeStateSum<std::int64_t>( - dst_bucket_indices, src_buckets_start, state_offsets_[i]); - break; - } - case AggregationID::kSum: { - const Type *argument_type = handle->getArgumentTypes().front(); - switch (argument_type->getTypeID()) { - case kInt: // Fall through - case kLong: { - mergeStateSum<std::int64_t>( - dst_bucket_indices, src_buckets_start, state_offsets_[i]); - break; - } - case kFloat: // Fall through - case kDouble: { - mergeStateSum<double>( - dst_bucket_indices, src_buckets_start, state_offsets_[i]); - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - } - } - - void print() const { - std::cout << "num_entries = " << buckets_allocated_ << "\n"; - const double *values = static_cast<const double*>(buckets_.get()); - for (std::size_t i = 0; i < buckets_allocated_; ++i) { - std::cout << values[i] << "\n"; - } - } - - void finalize(ColumnVectorsValueAccessor *output) const { - std::size_t key_offset = 0; - for (std::size_t i = 0; i < key_types_.size(); ++i) { - const Type &key_type = *key_types_[i]; - std::unique_ptr<NativeColumnVector> native_cv( - new NativeColumnVector(key_type, buckets_allocated_)); - - const std::size_t key_size = key_sizes_[i]; - switch (key_size) { - case 1u: - finalizeKey<std::uint8_t>(key_offset, native_cv.get()); - break; - case 2u: - finalizeKey<std::uint16_t>(key_offset, native_cv.get()); - break; - case 4u: - finalizeKey<std::uint32_t>(key_offset, native_cv.get()); - break; - case 8u: - finalizeKey<std::uint64_t>(key_offset, native_cv.get()); - break; - default: - LOG(FATAL) << "Not implemented"; - } - output->addColumn(native_cv.release()); - - key_offset += key_size; - } - - // Dispatch - for (std::size_t i = 0; i < handles_.size(); ++i) { - const AggregationHandle *handle = handles_[i]; - const Type &result_type = *handle->getResultType(); - std::unique_ptr<NativeColumnVector> native_cv( - new NativeColumnVector(result_type, buckets_allocated_)); - - switch (handle->getAggregationID()) { - case AggregationID::kCount: { - finalizeStateSum<std::int64_t, std::int64_t>( - state_offsets_[i], native_cv.get()); - break; - } - case AggregationID::kSum: { - const Type *argument_type = handle->getArgumentTypes().front(); - switch (argument_type->getTypeID()) { - case kInt: // Fall through - case kLong: { - finalizeStateSum<std::int64_t, std::int64_t>( - state_offsets_[i], native_cv.get()); - break; - } - case kFloat: // Fall through - case kDouble: { - finalizeStateSum<double, double>( - state_offsets_[i], native_cv.get()); - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - break; - } - default: - LOG(FATAL) << "Not implemented"; - } - output->addColumn(native_cv.release()); - } - } - - private: - using BucketIndex = std::uint32_t; - - template <typename KeyT> - inline static void ConstructKeyCode(const std::size_t offset, - const attribute_id attr_id, - ValueAccessor *accessor, - void *key_code_start) { - InvokeOnAnyValueAccessor( - accessor, - [&](auto *accessor) -> void { // NOLINT(build/c++11) - char *key_code_ptr = static_cast<char*>(key_code_start) + offset; - accessor->beginIteration(); - while (accessor->next()) { - *reinterpret_cast<KeyT*>(key_code_ptr) = - *static_cast<const KeyT*>( - accessor->template getUntypedValue<false>(attr_id)); - key_code_ptr += sizeof(std::uint64_t); - } - }); - } - - inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices, - const std::size_t state_offset) { - char *state_start = static_cast<char*>(buckets_.get()) + state_offset; - for (const BucketIndex idx : bucket_indices) { - char *state_ptr = state_start + bucket_size_ * idx; - *reinterpret_cast<std::int64_t*>(state_ptr) += 1; - } - } - - template <typename ArgumentT, typename StateT> - inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices, - const std::size_t state_offset, - const attribute_id attr_id, - ValueAccessor *accessor) { - InvokeOnAnyValueAccessor( - accessor, - [&](auto *accessor) -> void { // NOLINT(build/c++11) - accessor->beginIteration(); - - char *state_start = static_cast<char*>(buckets_.get()) + state_offset; - std::size_t loc = 0; - while (accessor->next()) { - char *state_ptr = state_start + bucket_size_ * bucket_indices[loc]; - *reinterpret_cast<StateT*>(state_ptr) += - *static_cast<const ArgumentT*>( - accessor->template getUntypedValue<false>(attr_id)); - ++loc; - } - }); - } - - template <typename StateT> - inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices, - const void *src_buckets_start, - const std::size_t state_offset) { - char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset; - const char* src_state_start = - static_cast<const char*>(src_buckets_start) + state_offset; - for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) { - char *dst_state_ptr = dst_state_start + bucket_size_ * dst_bucket_indices[i]; - const char *src_state_ptr = src_state_start + bucket_size_ * i; - *reinterpret_cast<StateT*>(dst_state_ptr) += - *reinterpret_cast<const StateT*>(src_state_ptr); - } - } - - template <typename KeyT> - inline void finalizeKey(const std::size_t offset, - NativeColumnVector *output_cv) const { - const char *key_ptr = static_cast<const char*>(keys_.get()) + offset; - for (std::size_t i = 0; i < buckets_allocated_; ++i) { - *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) = - *reinterpret_cast<const KeyT*>(key_ptr); - key_ptr += sizeof(std::uint64_t); - } - } - - template <typename StateT, typename ResultT> - inline void finalizeStateSum(const std::size_t state_offset, - NativeColumnVector *output_cv) const { - char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset; - for (std::size_t i = 0; i < buckets_allocated_; ++i) { - *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) = - *reinterpret_cast<const StateT*>(state_ptr); - state_ptr += bucket_size_; - } - } - - const std::vector<const Type*> key_types_; - const std::vector<AggregationHandle *> handles_; - - std::vector<std::size_t> key_sizes_; - std::vector<std::size_t> state_offsets_; - std::size_t bucket_size_; - - std::unordered_map<std::uint64_t, BucketIndex> index_; - - std::size_t num_buckets_; - std::size_t buckets_allocated_; - - ScopedBuffer keys_; - ScopedBuffer buckets_; - - DISALLOW_COPY_AND_ASSIGN(ThreadPrivateNumericHashTable); -}; - -} // namespace quickstep - -#endif // QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_