http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp index 6e6d188..798ba76 100644 --- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp @@ -32,8 +32,9 @@ #include "expressions/aggregation/AggregationHandleMin.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/AggregationOperationState.hpp" -#include "storage/FastHashTableFactory.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DatetimeLit.hpp" @@ -50,10 +51,7 @@ #include "types/VarCharType.hpp" #include "types/YearMonthIntervalType.hpp" #include "types/containers/ColumnVector.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "types/containers/ColumnVectorsValueAccessor.hpp" -#endif #include "types/operations/comparisons/Comparison.hpp" #include "types/operations/comparisons/ComparisonFactory.hpp" @@ -222,34 +220,6 @@ class AggregationHandleMinTest : public ::testing::Test { } template <typename GenericType> - void checkAggregationMinGenericColumnVector() { - const GenericType &type = GenericType::Instance(true); - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_min_->finalize(*aggregation_handle_min_state_) - .isNull()); - - typename GenericType::cpptype min; - std::vector<std::unique_ptr<ColumnVector>> column_vectors; - column_vectors.emplace_back( - createColumnVectorGeneric<GenericType>(type, &min)); - - std::unique_ptr<AggregationState> cv_state( - aggregation_handle_min_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckMinValue<typename GenericType::cpptype>( - min, *aggregation_handle_min_, *cv_state); - - aggregation_handle_min_->mergeStates(*cv_state, - aggregation_handle_min_state_.get()); - CheckMinValue<typename GenericType::cpptype>( - min, *aggregation_handle_min_, *aggregation_handle_min_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template <typename GenericType> void checkAggregationMinGenericValueAccessor() { const GenericType &type = GenericType::Instance(true); initializeHandle(type); @@ -265,7 +235,8 @@ class AggregationHandleMinTest : public ::testing::Test { std::unique_ptr<AggregationState> va_state( aggregation_handle_min_->accumulateValueAccessor( - accessor.get(), std::vector<attribute_id>(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -277,7 +248,6 @@ class AggregationHandleMinTest : public ::testing::Test { CheckMinValue<typename GenericType::cpptype>( min, *aggregation_handle_min_, *aggregation_handle_min_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION template <typename StringType> void checkAggregationMinString() { @@ -382,33 +352,6 @@ class AggregationHandleMinTest : public ::testing::Test { } template <typename StringType, typename ColumnVectorType> - void checkAggregationMinStringColumnVector() { - const StringType &type = StringType::Instance(10, true); - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_min_->finalize(*aggregation_handle_min_state_) - .isNull()); - - std::string min; - std::vector<std::unique_ptr<ColumnVector>> column_vectors; - column_vectors.emplace_back( - createColumnVectorString<ColumnVectorType>(type, &min)); - - std::unique_ptr<AggregationState> cv_state( - aggregation_handle_min_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckMinString(min, *aggregation_handle_min_, *cv_state); - - aggregation_handle_min_->mergeStates(*cv_state, - aggregation_handle_min_state_.get()); - CheckMinString( - min, *aggregation_handle_min_, *aggregation_handle_min_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template <typename StringType, typename ColumnVectorType> void checkAggregationMinStringValueAccessor() { const StringType &type = StringType::Instance(10, true); initializeHandle(type); @@ -423,7 +366,8 @@ class AggregationHandleMinTest : public ::testing::Test { std::unique_ptr<AggregationState> va_state( aggregation_handle_min_->accumulateValueAccessor( - accessor.get(), std::vector<attribute_id>(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -434,7 +378,6 @@ class AggregationHandleMinTest : public ::testing::Test { CheckMinString( min, *aggregation_handle_min_, *aggregation_handle_min_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION std::unique_ptr<AggregationHandle> aggregation_handle_min_; std::unique_ptr<AggregationState> aggregation_handle_min_state_; @@ -511,43 +454,6 @@ TEST_F(AggregationHandleMinTest, VarCharTypeTest) { checkAggregationMinString<VarCharType>(); } -TEST_F(AggregationHandleMinTest, IntTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<IntType>(); -} - -TEST_F(AggregationHandleMinTest, LongTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<LongType>(); -} - -TEST_F(AggregationHandleMinTest, FloatTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<FloatType>(); -} - -TEST_F(AggregationHandleMinTest, DoubleTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<DoubleType>(); -} - -TEST_F(AggregationHandleMinTest, DatetimeTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<DatetimeType>(); -} - -TEST_F(AggregationHandleMinTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<DatetimeIntervalType>(); -} - -TEST_F(AggregationHandleMinTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationMinGenericColumnVector<YearMonthIntervalType>(); -} - -TEST_F(AggregationHandleMinTest, CharTypeColumnVectorTest) { - checkAggregationMinStringColumnVector<CharType, NativeColumnVector>(); -} - -TEST_F(AggregationHandleMinTest, VarCharTypeColumnVectorTest) { - checkAggregationMinStringColumnVector<VarCharType, IndirectColumnVector>(); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleMinTest, IntTypeValueAccessorTest) { checkAggregationMinGenericValueAccessor<IntType>(); } @@ -583,7 +489,6 @@ TEST_F(AggregationHandleMinTest, CharTypeValueAccessorTest) { TEST_F(AggregationHandleMinTest, VarCharTypeValueAccessorTest) { checkAggregationMinStringValueAccessor<VarCharType, IndirectColumnVector>(); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #ifdef QUICKSTEP_DEBUG TEST_F(AggregationHandleMinDeathTest, WrongTypeTest) { @@ -685,28 +590,25 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) { initializeHandle(int_non_null_type); storage_manager_.reset(new StorageManager("./test_min_data")); std::unique_ptr<AggregationStateHashTableBase> source_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &int_non_null_type), 10, - {aggregation_handle_min_.get()->getPayloadSize()}, {aggregation_handle_min_.get()}, storage_manager_.get())); std::unique_ptr<AggregationStateHashTableBase> destination_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &int_non_null_type), 10, - {aggregation_handle_min_.get()->getPayloadSize()}, {aggregation_handle_min_.get()}, storage_manager_.get())); - AggregationStateFastHashTable *destination_hash_table_derived = - static_cast<AggregationStateFastHashTable *>( - destination_hash_table.get()); + PackedPayloadHashTable *destination_hash_table_derived = + static_cast<PackedPayloadHashTable *>(destination_hash_table.get()); - AggregationStateFastHashTable *source_hash_table_derived = - static_cast<AggregationStateFastHashTable *>(source_hash_table.get()); + PackedPayloadHashTable *source_hash_table_derived = + static_cast<PackedPayloadHashTable *>(source_hash_table.get()); AggregationHandleMin *aggregation_handle_min_derived = static_cast<AggregationHandleMin *>(aggregation_handle_min_.get()); @@ -776,47 +678,47 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) { memcpy(buffer + 1, common_key_source_state.get()->getPayloadAddress(), aggregation_handle_min_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(common_key, buffer); + source_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, common_key_destination_state.get()->getPayloadAddress(), aggregation_handle_min_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(common_key, buffer); + destination_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, exclusive_key_source_state.get()->getPayloadAddress(), aggregation_handle_min_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer); memcpy(buffer + 1, exclusive_key_destination_state.get()->getPayloadAddress(), aggregation_handle_min_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(exclusive_destination_key, - buffer); + destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - AggregationOperationState::mergeGroupByHashTables( - source_hash_table.get(), destination_hash_table.get()); + HashTableMerger merger(destination_hash_table_derived); + source_hash_table_derived->forEachCompositeKey(&merger); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckMinValue<int>( common_key_source_min_val.getLiteral<int>(), - aggregation_handle_min_derived->finalizeHashTableEntryFast( + aggregation_handle_min_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey(common_key) + 1)); - CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(), - aggregation_handle_min_derived->finalizeHashTableEntryFast( - destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key) + - 1)); - CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(), - aggregation_handle_min_derived->finalizeHashTableEntryFast( - source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key) + - 1)); + CheckMinValue<int>( + exclusive_key_destination_min_val.getLiteral<int>(), + aggregation_handle_min_derived->finalizeHashTableEntry( + destination_hash_table_derived->getSingleCompositeKey( + exclusive_destination_key) + 1)); + CheckMinValue<int>( + exclusive_key_source_min_val.getLiteral<int>(), + aggregation_handle_min_derived->finalizeHashTableEntry( + source_hash_table_derived->getSingleCompositeKey( + exclusive_source_key) + 1)); } } // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp ---------------------------------------------------------------------- diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp index 1d1c084..31a35a0 100644 --- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp +++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp @@ -29,8 +29,9 @@ #include "expressions/aggregation/AggregationHandleSum.hpp" #include "expressions/aggregation/AggregationID.hpp" #include "storage/AggregationOperationState.hpp" -#include "storage/FastHashTableFactory.hpp" +#include "storage/PackedPayloadHashTable.hpp" #include "storage/StorageManager.hpp" +#include "storage/ValueAccessorMultiplexer.hpp" #include "types/CharType.hpp" #include "types/DatetimeIntervalType.hpp" #include "types/DoubleType.hpp" @@ -45,10 +46,7 @@ #include "types/VarCharType.hpp" #include "types/YearMonthIntervalType.hpp" #include "types/containers/ColumnVector.hpp" - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #include "types/containers/ColumnVectorsValueAccessor.hpp" -#endif #include "gtest/gtest.h" @@ -186,36 +184,6 @@ class AggregationHandleSumTest : public ::testing::Test { } template <typename GenericType, typename PrecisionType> - void checkAggregationSumGenericColumnVector() { - const GenericType &type = GenericType::Instance(true); - - initializeHandle(type); - EXPECT_TRUE( - aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_) - .isNull()); - - typename PrecisionType::cpptype sum; - std::vector<std::unique_ptr<ColumnVector>> column_vectors; - column_vectors.emplace_back( - createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>( - type, &sum)); - - std::unique_ptr<AggregationState> cv_state( - aggregation_handle_sum_->accumulateColumnVectors(column_vectors)); - - // Test the state generated directly by accumulateColumnVectors(), and also - // test after merging back. - CheckSumValue<typename PrecisionType::cpptype>( - sum, *aggregation_handle_sum_, *cv_state); - - aggregation_handle_sum_->mergeStates(*cv_state, - aggregation_handle_sum_state_.get()); - CheckSumValue<typename PrecisionType::cpptype>( - sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); - } - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION - template <typename GenericType, typename PrecisionType> void checkAggregationSumGenericValueAccessor() { const GenericType &type = GenericType::Instance(true); @@ -233,7 +201,8 @@ class AggregationHandleSumTest : public ::testing::Test { std::unique_ptr<AggregationState> va_state( aggregation_handle_sum_->accumulateValueAccessor( - accessor.get(), std::vector<attribute_id>(1, 0))); + {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)}, + ValueAccessorMultiplexer(accessor.get()))); // Test the state generated directly by accumulateValueAccessor(), and also // test after merging back. @@ -245,7 +214,6 @@ class AggregationHandleSumTest : public ::testing::Test { CheckSumValue<typename PrecisionType::cpptype>( sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION std::unique_ptr<AggregationHandle> aggregation_handle_sum_; std::unique_ptr<AggregationState> aggregation_handle_sum_state_; @@ -306,33 +274,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeTest) { checkAggregationSumGeneric<YearMonthIntervalType, YearMonthIntervalType>(); } -TEST_F(AggregationHandleSumTest, IntTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<IntType, LongType>(); -} - -TEST_F(AggregationHandleSumTest, LongTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<LongType, LongType>(); -} - -TEST_F(AggregationHandleSumTest, FloatTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<FloatType, DoubleType>(); -} - -TEST_F(AggregationHandleSumTest, DoubleTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<DoubleType, DoubleType>(); -} - -TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<DatetimeIntervalType, - DatetimeIntervalType>(); -} - -TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) { - checkAggregationSumGenericColumnVector<YearMonthIntervalType, - YearMonthIntervalType>(); -} - -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION TEST_F(AggregationHandleSumTest, IntTypeValueAccessorTest) { checkAggregationSumGenericValueAccessor<IntType, LongType>(); } @@ -358,7 +299,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) { checkAggregationSumGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>(); } -#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION #ifdef QUICKSTEP_DEBUG TEST_F(AggregationHandleSumDeathTest, CharTypeTest) { @@ -464,28 +404,25 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { initializeHandle(long_non_null_type); storage_manager_.reset(new StorageManager("./test_sum_data")); std::unique_ptr<AggregationStateHashTableBase> source_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, - {aggregation_handle_sum_.get()->getPayloadSize()}, {aggregation_handle_sum_.get()}, storage_manager_.get())); std::unique_ptr<AggregationStateHashTableBase> destination_hash_table( - AggregationStateFastHashTableFactory::CreateResizable( + AggregationStateHashTableFactory::CreateResizable( HashTableImplType::kSeparateChaining, std::vector<const Type *>(1, &long_non_null_type), 10, - {aggregation_handle_sum_.get()->getPayloadSize()}, {aggregation_handle_sum_.get()}, storage_manager_.get())); - AggregationStateFastHashTable *destination_hash_table_derived = - static_cast<AggregationStateFastHashTable *>( - destination_hash_table.get()); + PackedPayloadHashTable *destination_hash_table_derived = + static_cast<PackedPayloadHashTable *>(destination_hash_table.get()); - AggregationStateFastHashTable *source_hash_table_derived = - static_cast<AggregationStateFastHashTable *>(source_hash_table.get()); + PackedPayloadHashTable *source_hash_table_derived = + static_cast<PackedPayloadHashTable *>(source_hash_table.get()); AggregationHandleSum *aggregation_handle_sum_derived = static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get()); @@ -563,49 +500,47 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) { memcpy(buffer + 1, common_key_source_state.get()->getPayloadAddress(), aggregation_handle_sum_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(common_key, buffer); + source_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, common_key_destination_state.get()->getPayloadAddress(), aggregation_handle_sum_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(common_key, buffer); + destination_hash_table_derived->upsertCompositeKey(common_key, buffer); memcpy(buffer + 1, exclusive_key_source_state.get()->getPayloadAddress(), aggregation_handle_sum_.get()->getPayloadSize()); - source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer); + source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer); memcpy(buffer + 1, exclusive_key_destination_state.get()->getPayloadAddress(), aggregation_handle_sum_.get()->getPayloadSize()); - destination_hash_table_derived->putCompositeKey(exclusive_destination_key, - buffer); + destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key, + buffer); EXPECT_EQ(2u, destination_hash_table_derived->numEntries()); EXPECT_EQ(2u, source_hash_table_derived->numEntries()); - AggregationOperationState::mergeGroupByHashTables( - source_hash_table.get(), destination_hash_table.get()); + HashTableMerger merger(destination_hash_table_derived); + source_hash_table_derived->forEachCompositeKey(&merger); EXPECT_EQ(3u, destination_hash_table_derived->numEntries()); CheckSumValue<std::int64_t>( common_key_merged_val.getLiteral<std::int64_t>(), - aggregation_handle_sum_derived->finalizeHashTableEntryFast( + aggregation_handle_sum_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey(common_key) + 1)); CheckSumValue<std::int64_t>( exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), - aggregation_handle_sum_derived->finalizeHashTableEntryFast( + aggregation_handle_sum_derived->finalizeHashTableEntry( destination_hash_table_derived->getSingleCompositeKey( - exclusive_destination_key) + - 1)); + exclusive_destination_key) + 1)); CheckSumValue<std::int64_t>( exclusive_key_source_sum_val.getLiteral<std::int64_t>(), - aggregation_handle_sum_derived->finalizeHashTableEntryFast( + aggregation_handle_sum_derived->finalizeHashTableEntry( source_hash_table_derived->getSingleCompositeKey( - exclusive_source_key) + - 1)); + exclusive_source_key) + 1)); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp index 895c2ea..ed0f99c 100644 --- a/query_execution/QueryContext.hpp +++ b/query_execution/QueryContext.hpp @@ -200,20 +200,6 @@ class QueryContext { } /** - * @brief Destroy the payloads from the aggregation hash tables. - * - * @warning After calling these methods, the hash table will be in an invalid - * state. No other operation should be performed on them. - * - * @param id The ID of the AggregationOperationState. - **/ - inline void destroyAggregationHashTablePayload(const aggregation_state_id id) { - DCHECK_LT(id, aggregation_states_.size()); - DCHECK(aggregation_states_[id]); - aggregation_states_[id]->destroyAggregationHashTablePayload(); - } - - /** * @brief Whether the given GeneratorFunctionHandle id is valid. * * @param id The GeneratorFunctionHandle id. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index 7f90e11..a755832 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto + quickstep_expressions_aggregation_AggregationID quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_expressions_scalar_ScalarAttribute @@ -125,6 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator + quickstep_relationaloperators_InitializeAggregationOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RelationalOperator @@ -145,6 +147,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_storage_StorageBlockLayout_proto quickstep_storage_SubBlockTypeRegistry quickstep_types_Type + quickstep_types_TypeID quickstep_types_Type_proto quickstep_types_TypedValue quickstep_types_TypedValue_proto http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 6918313..67c8739 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -49,6 +49,7 @@ #include "expressions/Expressions.pb.h" #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunction.pb.h" +#include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" @@ -105,6 +106,7 @@ #include "relational_operators/DropTableOperator.hpp" #include "relational_operators/FinalizeAggregationOperator.hpp" #include "relational_operators/HashJoinOperator.hpp" +#include "relational_operators/InitializeAggregationOperator.hpp" #include "relational_operators/InsertOperator.hpp" #include "relational_operators/NestedLoopsJoinOperator.hpp" #include "relational_operators/RelationalOperator.hpp" @@ -126,6 +128,7 @@ #include "storage/SubBlockTypeRegistry.hpp" #include "types/Type.hpp" #include "types/Type.pb.h" +#include "types/TypeID.hpp" #include "types/TypedValue.hpp" #include "types/TypedValue.pb.h" #include "types/containers/Tuple.pb.h" @@ -371,6 +374,110 @@ void ExecutionGenerator::dropAllTemporaryRelations() { } } +bool ExecutionGenerator::canUseCollisionFreeAggregation( + const P::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *max_num_groups) const { +#ifdef QUICKSTEP_DISTRIBUTED + // Currently we cannot do this fast path with the distributed setting. See + // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and + // FinalizeAggregationOperator::getAllWorkOrderProtos(). + return false; +#endif + + // Supports only single group-by key. + if (aggregate->grouping_expressions().size() != 1) { + return false; + } + + // We need to know the exact min/max stats of the group-by key. + // So it must be a CatalogAttribute (but not an expression). + E::AttributeReferencePtr group_by_key_attr; + const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front(); + if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) { + return false; + } + + bool min_value_stat_is_exact; + bool max_value_stat_is_exact; + const TypedValue min_value = + cost_model_for_aggregation_->findMinValueStat( + aggregate, group_by_key_attr, &min_value_stat_is_exact); + const TypedValue max_value = + cost_model_for_aggregation_->findMaxValueStat( + aggregate, group_by_key_attr, &max_value_stat_is_exact); + if (min_value.isNull() || max_value.isNull() || + (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) { + return false; + } + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + switch (group_by_key_attr->getValueType().getTypeID()) { + case TypeID::kInt: { + min_cpp_value = min_value.getLiteral<int>(); + max_cpp_value = max_value.getLiteral<int>(); + break; + } + case TypeID::kLong: { + min_cpp_value = min_value.getLiteral<std::int64_t>(); + max_cpp_value = max_value.getLiteral<std::int64_t>(); + break; + } + default: + return false; + } + + // TODO(jianqiao): + // 1. Handle the case where min_cpp_value is below 0 or far greater than 0. + // 2. Reason about the upbound (e.g. by checking memory size) instead of + // hardcoding it here. + const std::int64_t kGroupSizeUpbound = 1000000000; + if (min_cpp_value < 0 || + max_cpp_value > kGroupSizeUpbound || + max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) { + return false; + } + + for (const auto &agg_expr : aggregate->aggregate_expressions()) { + const E::AggregateFunctionPtr agg_func = + std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression()); + + if (agg_func->is_distinct()) { + return false; + } + + // TODO(jianqiao): Support AggregationID::AVG. + switch (agg_func->getAggregate().getAggregationID()) { + case AggregationID::kCount: // Fall through + case AggregationID::kSum: + break; + default: + return false; + } + + const auto &arguments = agg_func->getArguments(); + if (arguments.size() > 1) { + return false; + } + + if (arguments.size() == 1) { + switch (arguments.front()->getValueType().getTypeID()) { + case TypeID::kInt: // Fall through + case TypeID::kLong: + case TypeID::kFloat: + case TypeID::kDouble: + break; + default: + return false; + } + } + } + + *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; + return true; +} + void ExecutionGenerator::convertNamedExpressions( const std::vector<E::NamedExpressionPtr> &named_expressions, S::QueryContext::ScalarGroup *scalar_group_proto) { @@ -1475,6 +1582,8 @@ void ExecutionGenerator::convertAggregate( findRelationInfoOutputByPhysical(physical_plan->input()); aggr_state_proto->set_relation_id(input_relation_info->relation->getID()); + bool use_parallel_initialization = false; + std::vector<const Type*> group_by_types; for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) { unique_ptr<const Scalar> execution_group_by_expression; @@ -1495,9 +1604,28 @@ void ExecutionGenerator::convertAggregate( } if (!group_by_types.empty()) { - // Right now, only SeparateChaining is supported. - aggr_state_proto->set_hash_table_impl_type( - serialization::HashTableImplType::SEPARATE_CHAINING); + const std::size_t estimated_num_groups = + cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); + + std::size_t max_num_groups; + const bool can_use_collision_free_aggregation = + canUseCollisionFreeAggregation(physical_plan, + estimated_num_groups, + &max_num_groups); + + if (can_use_collision_free_aggregation) { + aggr_state_proto->set_hash_table_impl_type( + serialization::HashTableImplType::COLLISION_FREE_VECTOR); + aggr_state_proto->set_estimated_num_entries(max_num_groups); + use_parallel_initialization = true; + } else { + // Otherwise, use SeparateChaining. + aggr_state_proto->set_hash_table_impl_type( + serialization::HashTableImplType::SEPARATE_CHAINING); + aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); + } + } else { + aggr_state_proto->set_estimated_num_entries(1uL); } for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { @@ -1535,10 +1663,6 @@ void ExecutionGenerator::convertAggregate( aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto()); } - const std::size_t estimated_num_groups = - cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); - aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups)); - const QueryPlan::DAGNodeIndex aggregation_operator_index = execution_plan_->addRelationalOperator( new AggregationOperator( @@ -1553,6 +1677,18 @@ void ExecutionGenerator::convertAggregate( false /* is_pipeline_breaker */); } + if (use_parallel_initialization) { + const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index = + execution_plan_->addRelationalOperator( + new InitializeAggregationOperator( + query_handle_->query_id(), + aggr_state_index)); + + execution_plan_->addDirectDependency(aggregation_operator_index, + initialize_aggregation_operator_index, + true /* is_pipeline_breaker */); + } + // Create InsertDestination proto. const CatalogRelation *output_relation = nullptr; const QueryContext::insert_destination_id insert_destination_index = http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index eba6eee..987f11a 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -20,6 +20,7 @@ #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_ #define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_ +#include <cstddef> #include <memory> #include <string> #include <unordered_map> @@ -37,6 +38,7 @@ #include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryPlan.hpp" #include "query_optimizer/cost_model/CostModel.hpp" +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/NamedExpression.hpp" #include "query_optimizer/expressions/Predicate.hpp" @@ -204,6 +206,22 @@ class ExecutionGenerator { std::string getNewRelationName(); /** + * @brief Checks whether an aggregate node can be efficiently evaluated with + * the collision-free aggregation fast path. + * + * @param aggregate The physical aggregate node to be checked. + * @param estimated_num_groups The estimated number of groups for the aggregate. + * @param exact_num_groups If collision-free aggregation is applicable, the + * pointed content of this pointer will be set as the maximum possible + * number of groups that the collision-free hash table need to hold. + * @return A bool value indicating whether collision-free aggregation can be + * used to evaluate \p aggregate. + */ + bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *max_num_groups) const; + + /** * @brief Sets up the info of the CatalogRelation represented by TableReference. * TableReference is not converted to any operator. * @@ -427,7 +445,7 @@ class ExecutionGenerator { /** * @brief The cost model to use for estimating aggregation hash table size. */ - std::unique_ptr<cost::CostModel> cost_model_for_aggregation_; + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_; /** * @brief The cost model to use for estimating join hash table size. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt index c18dc77..df4114d 100644 --- a/relational_operators/CMakeLists.txt +++ b/relational_operators/CMakeLists.txt @@ -47,6 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator FinalizeAggregationOperator.cpp FinalizeAggregationOperator.hpp) add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp) +add_library(quickstep_relationaloperators_InitializeAggregationOperator + InitializeAggregationOperator.cpp + InitializeAggregationOperator.hpp) add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp) add_library(quickstep_relationaloperators_NestedLoopsJoinOperator NestedLoopsJoinOperator.cpp @@ -254,6 +257,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator quickstep_utility_lipfilter_LIPFilterAdaptiveProber quickstep_utility_lipfilter_LIPFilterUtil tmb) +target_link_libraries(quickstep_relationaloperators_InitializeAggregationOperator + glog + quickstep_queryexecution_QueryContext + quickstep_queryexecution_WorkOrderProtosContainer + quickstep_queryexecution_WorkOrdersContainer + quickstep_relationaloperators_RelationalOperator + quickstep_relationaloperators_WorkOrder + quickstep_relationaloperators_WorkOrder_proto + quickstep_storage_AggregationOperationState + quickstep_utility_Macros + tmb) target_link_libraries(quickstep_relationaloperators_InsertOperator glog quickstep_catalog_CatalogRelation @@ -548,6 +562,7 @@ target_link_libraries(quickstep_relationaloperators quickstep_relationaloperators_DropTableOperator quickstep_relationaloperators_FinalizeAggregationOperator quickstep_relationaloperators_HashJoinOperator + quickstep_relationaloperators_InitializeAggregationOperator quickstep_relationaloperators_InsertOperator quickstep_relationaloperators_NestedLoopsJoinOperator quickstep_relationaloperators_RebuildWorkOrder http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/DestroyAggregationStateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp index 49be43d..62ca9e7 100644 --- a/relational_operators/DestroyAggregationStateOperator.cpp +++ b/relational_operators/DestroyAggregationStateOperator.cpp @@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta } void DestroyAggregationStateWorkOrder::execute() { - // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate - // from the destroyAggregationState call. The reason is that the aggregation - // hash tables don't own the AggregationHandle objects. However the hash table - // class requires the handles for destroying the payload (see the - // destroyPayload methods in AggregationHandle classes). Therefore, we first - // destroy the payloads in the hash table and then destroy the hash table. - query_context_->destroyAggregationHashTablePayload(aggr_state_index_); query_context_->destroyAggregationState(aggr_state_index_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/FinalizeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp index 0cbf635..77b4b97 100644 --- a/relational_operators/FinalizeAggregationOperator.cpp +++ b/relational_operators/FinalizeAggregationOperator.cpp @@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders( AggregationOperationState *agg_state = query_context->getAggregationState(aggr_state_index_); DCHECK(agg_state != nullptr); - for (int part_id = 0; - part_id < static_cast<int>(agg_state->getNumPartitions()); + for (std::size_t part_id = 0; + part_id < agg_state->getNumFinalizationPartitions(); ++part_id) { container->addNormalWorkOrder( new FinalizeAggregationWorkOrder( query_id_, + part_id, agg_state, - query_context->getInsertDestination(output_destination_index_), - part_id), + query_context->getInsertDestination(output_destination_index_)), op_index_); } } @@ -61,7 +61,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders( // TODO(quickstep-team) : Think about how the number of partitions could be // accessed in this function. Until then, we can't use partitioned aggregation -// with the distributed version. +// finalization with the distributed version. bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { if (blocking_dependencies_met_ && !started_) { started_ = true; @@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer } void FinalizeAggregationWorkOrder::execute() { - if (state_->isAggregatePartitioned()) { - state_->finalizeAggregatePartitioned(part_id_, output_destination_); - } else { - state_->finalizeAggregate(output_destination_); - } + state_->finalizeAggregate(partition_id_, output_destination_); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/FinalizeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp index ae7127a..3c209b1 100644 --- a/relational_operators/FinalizeAggregationOperator.hpp +++ b/relational_operators/FinalizeAggregationOperator.hpp @@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder { * @note InsertWorkOrder takes ownership of \c state. * * @param query_id The ID of the query to which this operator belongs. + * @param partition_id The partition ID for which the Finalize aggregation + * work order is issued. * @param state The AggregationState to use. * @param output_destination The InsertDestination to insert aggregation * results. - * @param part_id The partition ID for which the Finalize aggregation work - * order is issued. Ignore if aggregation is not partitioned. */ FinalizeAggregationWorkOrder(const std::size_t query_id, + const std::size_t partition_id, AggregationOperationState *state, - InsertDestination *output_destination, - const int part_id = -1) + InsertDestination *output_destination) : WorkOrder(query_id), + partition_id_(partition_id), state_(DCHECK_NOTNULL(state)), - output_destination_(DCHECK_NOTNULL(output_destination)), - part_id_(part_id) {} + output_destination_(DCHECK_NOTNULL(output_destination)) {} ~FinalizeAggregationWorkOrder() override {} void execute() override; private: + const std::size_t partition_id_; AggregationOperationState *state_; InsertDestination *output_destination_; - const int part_id_; DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/InitializeAggregationOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp new file mode 100644 index 0000000..b1063ad --- /dev/null +++ b/relational_operators/InitializeAggregationOperator.cpp @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/InitializeAggregationOperator.hpp" + +#include <cstddef> + +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/AggregationOperationState.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +bool InitializeAggregationOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (!started_) { + AggregationOperationState *agg_state = + query_context->getAggregationState(aggr_state_index_); + DCHECK(agg_state != nullptr); + + for (std::size_t part_id = 0; + part_id < agg_state->getNumInitializationPartitions(); + ++part_id) { + container->addNormalWorkOrder( + new InitializeAggregationWorkOrder(query_id_, + part_id, + agg_state), + op_index_); + } + started_ = true; + } + return true; +} + +// TODO(quickstep-team) : Think about how the number of partitions could be +// accessed in this function. Until then, we can't use partitioned aggregation +// initialization with the distributed version. +bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + LOG(FATAL) << "Not supported"; +} + +void InitializeAggregationWorkOrder::execute() { + state_->initialize(partition_id_); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/InitializeAggregationOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp new file mode 100644 index 0000000..58d848b --- /dev/null +++ b/relational_operators/InitializeAggregationOperator.hpp @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ + +#include <string> + +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class AggregationOperationState; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +/** + * @brief An operator which initializes an AggregationOperationState. + **/ +class InitializeAggregationOperator : public RelationalOperator { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of this query. + * @param aggr_state_index The index of the AggregationOperationState in QueryContext. + **/ + InitializeAggregationOperator(const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + aggr_state_index_(aggr_state_index), + started_(false) {} + + ~InitializeAggregationOperator() override {} + + std::string getName() const override { + return "InitializeAggregationOperator"; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + private: + const QueryContext::aggregation_state_id aggr_state_index_; + bool started_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator); +}; + +/** + * @brief A WorkOrder produced by InitializeAggregationOperator. + **/ +class InitializeAggregationWorkOrder : public WorkOrder { + public: + /** + * @brief Constructor. + * + * @param query_id The ID of the query to which this operator belongs. + * @param partition_id The partition ID for which the work order is issued. + * @param state The AggregationOperationState to be initialized. + */ + InitializeAggregationWorkOrder(const std::size_t query_id, + const std::size_t partition_id, + AggregationOperationState *state) + : WorkOrder(query_id), + partition_id_(partition_id), + state_(DCHECK_NOTNULL(state)) {} + + ~InitializeAggregationWorkOrder() override {} + + void execute() override; + + private: + const std::size_t partition_id_; + + AggregationOperationState *state_; + + DISALLOW_COPY_AND_ASSIGN(InitializeAggregationWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/7285f907/relational_operators/WorkOrderFactory.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp index 5e8d03d..306bd1a 100644 --- a/relational_operators/WorkOrderFactory.cpp +++ b/relational_operators/WorkOrderFactory.cpp @@ -186,6 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index; return new FinalizeAggregationWorkOrder( proto.query_id(), + 0uL, query_context->getAggregationState(proto.GetExtension( serialization::FinalizeAggregationWorkOrder::aggr_state_index)), query_context->getInsertDestination(