Fuse Aggregate with LeftOuterJoin to accelerate evaluation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccd707b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccd707b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccd707b3 Branch: refs/heads/aggregate-on-left-outer-join Commit: ccd707b3b2cb4655739d3cd0de8c2b62e2041cf5 Parents: 712ffd1 Author: Jianqiao Zhu <jianq...@cs.wisc.edu> Authored: Mon Jan 30 14:46:39 2017 -0600 Committer: Jianqiao Zhu <jianq...@cs.wisc.edu> Committed: Wed Feb 8 13:43:33 2017 -0600 ---------------------------------------------------------------------- query_optimizer/CMakeLists.txt | 6 +- query_optimizer/ExecutionGenerator.cpp | 261 +++++++++++-------- query_optimizer/ExecutionGenerator.hpp | 20 +- query_optimizer/PhysicalGenerator.cpp | 3 + query_optimizer/cost_model/CMakeLists.txt | 7 + query_optimizer/cost_model/SimpleCostModel.cpp | 9 + query_optimizer/cost_model/SimpleCostModel.hpp | 5 + .../cost_model/StarSchemaSimpleCostModel.cpp | 147 ++++++++++- .../cost_model/StarSchemaSimpleCostModel.hpp | 20 ++ query_optimizer/physical/CMakeLists.txt | 14 + .../CrossReferenceCoalesceAggregate.cpp | 105 ++++++++ .../CrossReferenceCoalesceAggregate.hpp | 176 +++++++++++++ query_optimizer/physical/PatternMatcher.hpp | 3 + query_optimizer/physical/PhysicalType.hpp | 1 + query_optimizer/rules/BottomUpRule.hpp | 39 +-- query_optimizer/rules/CMakeLists.txt | 23 ++ query_optimizer/rules/FuseAggregateJoin.cpp | 164 ++++++++++++ query_optimizer/rules/FuseAggregateJoin.hpp | 69 +++++ .../BuildAggregationExistenceMapOperator.cpp | 148 +++++++++++ .../BuildAggregationExistenceMapOperator.hpp | 146 +++++++++++ relational_operators/CMakeLists.txt | 25 ++ storage/AggregationOperationState.cpp | 13 +- storage/AggregationOperationState.hpp | 3 + storage/CollisionFreeVectorTable.hpp | 4 + utility/lip_filter/BitVectorExactFilter.hpp | 26 +- utility/lip_filter/CMakeLists.txt | 10 +- utility/lip_filter/SingleIdentityHashFilter.hpp | 20 +- 27 files changed, 1285 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt index e750a1e..3ff783c 100644 --- a/query_optimizer/CMakeLists.txt +++ b/query_optimizer/CMakeLists.txt @@ -64,7 +64,6 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_expressions_Expressions_proto quickstep_expressions_aggregation_AggregateFunction quickstep_expressions_aggregation_AggregateFunction_proto - quickstep_expressions_aggregation_AggregationID quickstep_expressions_predicate_Predicate quickstep_expressions_scalar_Scalar quickstep_expressions_scalar_ScalarAttribute @@ -95,6 +94,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_physical_CopyFrom quickstep_queryoptimizer_physical_CreateIndex quickstep_queryoptimizer_physical_CreateTable + quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate quickstep_queryoptimizer_physical_DeleteTuples quickstep_queryoptimizer_physical_DropTable quickstep_queryoptimizer_physical_FilterJoin @@ -116,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_queryoptimizer_physical_UpdateTable quickstep_queryoptimizer_physical_WindowAggregate quickstep_relationaloperators_AggregationOperator + quickstep_relationaloperators_BuildAggregationExistenceMapOperator quickstep_relationaloperators_BuildHashOperator quickstep_relationaloperators_BuildLIPFilterOperator quickstep_relationaloperators_CreateIndexOperator @@ -147,12 +148,10 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator quickstep_storage_StorageBlockLayout_proto quickstep_storage_SubBlockTypeRegistry quickstep_types_Type - quickstep_types_TypeID quickstep_types_Type_proto quickstep_types_TypedValue quickstep_types_TypedValue_proto quickstep_types_containers_Tuple_proto - quickstep_utility_EqualsAnyConstant quickstep_utility_Macros quickstep_utility_SqlError) if (ENABLE_DISTRIBUTED) @@ -213,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator quickstep_queryoptimizer_logical_Logical quickstep_queryoptimizer_physical_Physical quickstep_queryoptimizer_rules_AttachLIPFilters + quickstep_queryoptimizer_rules_FuseAggregateJoin quickstep_queryoptimizer_rules_InjectJoinFilters quickstep_queryoptimizer_rules_PruneColumns quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 1b50caa..1b41cdd 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -49,7 +49,6 @@ #include "expressions/Expressions.pb.h" #include "expressions/aggregation/AggregateFunction.hpp" #include "expressions/aggregation/AggregateFunction.pb.h" -#include "expressions/aggregation/AggregationID.hpp" #include "expressions/predicate/Predicate.hpp" #include "expressions/scalar/Scalar.hpp" #include "expressions/scalar/ScalarAttribute.hpp" @@ -72,9 +71,11 @@ #include "query_optimizer/expressions/Scalar.hpp" #include "query_optimizer/expressions/ScalarLiteral.hpp" #include "query_optimizer/expressions/WindowAggregateFunction.hpp" +#include "query_optimizer/physical/Aggregate.hpp" #include "query_optimizer/physical/CopyFrom.hpp" #include "query_optimizer/physical/CreateIndex.hpp" #include "query_optimizer/physical/CreateTable.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/DeleteTuples.hpp" #include "query_optimizer/physical/DropTable.hpp" #include "query_optimizer/physical/FilterJoin.hpp" @@ -96,6 +97,7 @@ #include "query_optimizer/physical/UpdateTable.hpp" #include "query_optimizer/physical/WindowAggregate.hpp" #include "relational_operators/AggregationOperator.hpp" +#include "relational_operators/BuildAggregationExistenceMapOperator.hpp" #include "relational_operators/BuildHashOperator.hpp" #include "relational_operators/BuildLIPFilterOperator.hpp" #include "relational_operators/CreateIndexOperator.hpp" @@ -128,11 +130,9 @@ #include "storage/SubBlockTypeRegistry.hpp" #include "types/Type.hpp" #include "types/Type.pb.h" -#include "types/TypeID.hpp" #include "types/TypedValue.hpp" #include "types/TypedValue.pb.h" #include "types/containers/Tuple.pb.h" -#include "utility/EqualsAnyConstant.hpp" #include "utility/SqlError.hpp" #include "gflags/gflags.h" @@ -163,10 +163,6 @@ static const volatile bool aggregate_hashtable_type_dummy DEFINE_bool(parallelize_load, true, "Parallelize loading data files."); -DEFINE_int64(collision_free_vector_table_max_size, 1000000000, - "The maximum allowed key range (number of entries) for using a " - "CollisionFreeVectorTable."); - namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; namespace S = ::quickstep::serialization; @@ -266,6 +262,9 @@ void ExecutionGenerator::generatePlanInternal( case P::PhysicalType::kAggregate: return convertAggregate( std::static_pointer_cast<const P::Aggregate>(physical_plan)); + case P::PhysicalType::kCrossReferenceCoalesceAggregate: + return convertCrossReferenceCoalesceAggregate( + std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan)); case P::PhysicalType::kCopyFrom: return convertCopyFrom( std::static_pointer_cast<const P::CopyFrom>(physical_plan)); @@ -379,105 +378,6 @@ void ExecutionGenerator::dropAllTemporaryRelations() { } } -bool ExecutionGenerator::canUseCollisionFreeAggregation( - const P::AggregatePtr &aggregate, - const std::size_t estimated_num_groups, - std::size_t *max_num_groups) const { -#ifdef QUICKSTEP_DISTRIBUTED - // Currently we cannot do this fast path with the distributed setting. See - // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and - // FinalizeAggregationOperator::getAllWorkOrderProtos(). - return false; -#endif - - // Supports only single group-by key. - if (aggregate->grouping_expressions().size() != 1) { - return false; - } - - // We need to know the exact min/max stats of the group-by key. - // So it must be a CatalogAttribute (but not an expression). - E::AttributeReferencePtr group_by_key_attr; - const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front(); - if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) { - return false; - } - - bool min_value_stat_is_exact; - bool max_value_stat_is_exact; - const TypedValue min_value = - cost_model_for_aggregation_->findMinValueStat( - aggregate, group_by_key_attr, &min_value_stat_is_exact); - const TypedValue max_value = - cost_model_for_aggregation_->findMaxValueStat( - aggregate, group_by_key_attr, &max_value_stat_is_exact); - if (min_value.isNull() || max_value.isNull() || - (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) { - return false; - } - - std::int64_t min_cpp_value; - std::int64_t max_cpp_value; - switch (group_by_key_attr->getValueType().getTypeID()) { - case TypeID::kInt: { - min_cpp_value = min_value.getLiteral<int>(); - max_cpp_value = max_value.getLiteral<int>(); - break; - } - case TypeID::kLong: { - min_cpp_value = min_value.getLiteral<std::int64_t>(); - max_cpp_value = max_value.getLiteral<std::int64_t>(); - break; - } - default: - return false; - } - - // TODO(jianqiao): - // 1. Handle the case where min_cpp_value is below 0 or far greater than 0. - // 2. Reason about the table size bound (e.g. by checking memory size) instead - // of hardcoding it as a gflag. - if (min_cpp_value < 0 || - max_cpp_value >= FLAGS_collision_free_vector_table_max_size || - max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) { - return false; - } - - for (const auto &agg_expr : aggregate->aggregate_expressions()) { - const E::AggregateFunctionPtr agg_func = - std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression()); - - if (agg_func->is_distinct()) { - return false; - } - - // TODO(jianqiao): Support AggregationID::AVG. - if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(), - AggregationID::kCount, - AggregationID::kSum)) { - return false; - } - - const auto &arguments = agg_func->getArguments(); - if (arguments.size() > 1u) { - return false; - } - - if (arguments.size() == 1u) { - if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(), - TypeID::kInt, - TypeID::kLong, - TypeID::kFloat, - TypeID::kDouble)) { - return false; - } - } - } - - *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; - return true; -} - void ExecutionGenerator::convertNamedExpressions( const std::vector<E::NamedExpressionPtr> &named_expressions, S::QueryContext::ScalarGroup *scalar_group_proto) { @@ -1608,9 +1508,10 @@ void ExecutionGenerator::convertAggregate( cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan); std::size_t max_num_groups; - if (canUseCollisionFreeAggregation(physical_plan, - estimated_num_groups, - &max_num_groups)) { + if (cost_model_for_aggregation_ + ->canUseCollisionFreeAggregation(physical_plan, + estimated_num_groups, + &max_num_groups)) { aggr_state_proto->set_hash_table_impl_type( serialization::HashTableImplType::COLLISION_FREE_VECTOR); aggr_state_proto->set_estimated_num_entries(max_num_groups); @@ -1730,6 +1631,148 @@ void ExecutionGenerator::convertAggregate( } } +void ExecutionGenerator::convertCrossReferenceCoalesceAggregate( + const P::CrossReferenceCoalesceAggregatePtr &physical_plan) { + DCHECK_EQ(1u, physical_plan->left_join_attributes().size()); + DCHECK_EQ(1u, physical_plan->right_join_attributes().size()); + + const CatalogRelationInfo *left_relation_info = + findRelationInfoOutputByPhysical(physical_plan->left_child()); + const CatalogRelationInfo *right_relation_info = + findRelationInfoOutputByPhysical(physical_plan->right_child()); + + // Create aggr state proto. + const QueryContext::aggregation_state_id aggr_state_index = + query_context_proto_->aggregation_states_size(); + S::AggregationOperationState *aggr_state_proto = query_context_proto_->add_aggregation_states(); + + aggr_state_proto->set_relation_id(right_relation_info->relation->getID()); + + // Group by the right join attribute. + std::unique_ptr<const Scalar> execution_group_by_expression( + physical_plan->right_join_attributes().front()->concretize( + attribute_substitution_map_)); + aggr_state_proto->add_group_by_expressions()->CopyFrom( + execution_group_by_expression->getProto()); + + aggr_state_proto->set_hash_table_impl_type( + serialization::HashTableImplType::COLLISION_FREE_VECTOR); + aggr_state_proto->set_estimated_num_entries( + physical_plan->hash_table_num_entries()); + + if (physical_plan->right_filter_predicate() != nullptr) { + std::unique_ptr<const Predicate> predicate( + convertPredicate(physical_plan->right_filter_predicate())); + aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto()); + } + + for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) { + const E::AggregateFunctionPtr unnamed_aggregate_expression = + std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression()); + + // Add a new entry in 'aggregates'. + S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates(); + + // Set the AggregateFunction. + aggr_proto->mutable_function()->CopyFrom( + unnamed_aggregate_expression->getAggregate().getProto()); + + // Add each of the aggregate's arguments. + for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) { + unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_)); + aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto()); + } + + // Set whether it is a DISTINCT aggregation. + DCHECK(!unnamed_aggregate_expression->is_distinct()); + aggr_proto->set_is_distinct(false); + } + + const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index = + execution_plan_->addRelationalOperator( + new InitializeAggregationOperator( + query_handle_->query_id(), + aggr_state_index)); + + const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index = + execution_plan_->addRelationalOperator( + new BuildAggregationExistenceMapOperator( + query_handle_->query_id(), + *left_relation_info->relation, + physical_plan->left_join_attributes().front()->id(), + left_relation_info->isStoredRelation(), + aggr_state_index)); + + if (!left_relation_info->isStoredRelation()) { + execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index, + left_relation_info->producer_operator_index, + false /* is_pipeline_breaker */); + } + + const QueryPlan::DAGNodeIndex aggregation_operator_index = + execution_plan_->addRelationalOperator( + new AggregationOperator( + query_handle_->query_id(), + *right_relation_info->relation, + right_relation_info->isStoredRelation(), + aggr_state_index)); + + if (!right_relation_info->isStoredRelation()) { + execution_plan_->addDirectDependency(aggregation_operator_index, + right_relation_info->producer_operator_index, + false /* is_pipeline_breaker */); + } + + // Build aggregation existence map once initialization is done. + execution_plan_->addDirectDependency(build_aggregation_existence_map_operator_index, + initialize_aggregation_operator_index, + true /* is_pipeline_breaker */); + + // Start aggregation after building existence map. + execution_plan_->addDirectDependency(aggregation_operator_index, + build_aggregation_existence_map_operator_index, + true /* is_pipeline_breaker */); + + + // Create InsertDestination proto. + const CatalogRelation *output_relation = nullptr; + const QueryContext::insert_destination_id insert_destination_index = + query_context_proto_->insert_destinations_size(); + S::InsertDestination *insert_destination_proto = query_context_proto_->add_insert_destinations(); + createTemporaryCatalogRelation(physical_plan, + &output_relation, + insert_destination_proto); + + const QueryPlan::DAGNodeIndex finalize_aggregation_operator_index = + execution_plan_->addRelationalOperator( + new FinalizeAggregationOperator(query_handle_->query_id(), + aggr_state_index, + *output_relation, + insert_destination_index)); + + insert_destination_proto->set_relational_op_index(finalize_aggregation_operator_index); + + execution_plan_->addDirectDependency(finalize_aggregation_operator_index, + aggregation_operator_index, + true /* is_pipeline_breaker */); + + physical_to_output_relation_map_.emplace( + std::piecewise_construct, + std::forward_as_tuple(physical_plan), + std::forward_as_tuple(finalize_aggregation_operator_index, output_relation)); + temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index, + output_relation); + + const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index = + execution_plan_->addRelationalOperator( + new DestroyAggregationStateOperator(query_handle_->query_id(), + aggr_state_index)); + + execution_plan_->addDirectDependency(destroy_aggregation_state_operator_index, + finalize_aggregation_operator_index, + true); +} + void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) { // Create sort configuration for run generation. vector<bool> sort_ordering(physical_sort->sort_ascending()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/ExecutionGenerator.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp index 987f11a..f4e614a 100644 --- a/query_optimizer/ExecutionGenerator.hpp +++ b/query_optimizer/ExecutionGenerator.hpp @@ -46,6 +46,7 @@ #include "query_optimizer/physical/CopyFrom.hpp" #include "query_optimizer/physical/CreateIndex.hpp" #include "query_optimizer/physical/CreateTable.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/DeleteTuples.hpp" #include "query_optimizer/physical/DropTable.hpp" #include "query_optimizer/physical/FilterJoin.hpp" @@ -206,22 +207,6 @@ class ExecutionGenerator { std::string getNewRelationName(); /** - * @brief Checks whether an aggregate node can be efficiently evaluated with - * the collision-free aggregation fast path. - * - * @param aggregate The physical aggregate node to be checked. - * @param estimated_num_groups The estimated number of groups for the aggregate. - * @param exact_num_groups If collision-free aggregation is applicable, the - * pointed content of this pointer will be set as the maximum possible - * number of groups that the collision-free hash table need to hold. - * @return A bool value indicating whether collision-free aggregation can be - * used to evaluate \p aggregate. - */ - bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate, - const std::size_t estimated_num_groups, - std::size_t *max_num_groups) const; - - /** * @brief Sets up the info of the CatalogRelation represented by TableReference. * TableReference is not converted to any operator. * @@ -356,6 +341,9 @@ class ExecutionGenerator { */ void convertAggregate(const physical::AggregatePtr &physical_plan); + void convertCrossReferenceCoalesceAggregate( + const physical::CrossReferenceCoalesceAggregatePtr &physical_plan); + /** * @brief Converts a physical Sort to SortRunGeneration and SortMergeRun. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/PhysicalGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp index 1b68f49..ac51c31 100644 --- a/query_optimizer/PhysicalGenerator.cpp +++ b/query_optimizer/PhysicalGenerator.cpp @@ -27,6 +27,7 @@ #include "query_optimizer/logical/Logical.hpp" #include "query_optimizer/physical/Physical.hpp" #include "query_optimizer/rules/AttachLIPFilters.hpp" +#include "query_optimizer/rules/FuseAggregateJoin.hpp" #include "query_optimizer/rules/InjectJoinFilters.hpp" #include "query_optimizer/rules/PruneColumns.hpp" #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp" @@ -145,6 +146,8 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() { rules.emplace_back(new ReorderColumns()); } + rules.emplace_back(new FuseAggregateJoin()); + // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is // suggested that all the new rules be placed before this point. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt index 5f28bb3..e222fdd 100644 --- a/query_optimizer/cost_model/CMakeLists.txt +++ b/query_optimizer/cost_model/CMakeLists.txt @@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel quickstep_catalog_CatalogRelationStatistics quickstep_queryoptimizer_costmodel_CostModel quickstep_queryoptimizer_physical_Aggregate + quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate quickstep_queryoptimizer_physical_FilterJoin quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_NestedLoopsJoin @@ -51,7 +52,10 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod quickstep_catalog_CatalogRelation quickstep_catalog_CatalogRelationStatistics quickstep_catalog_CatalogTypedefs + quickstep_expressions_aggregation_AggregateFunction + quickstep_expressions_aggregation_AggregationID quickstep_queryoptimizer_costmodel_CostModel + quickstep_queryoptimizer_expressions_AggregateFunction quickstep_queryoptimizer_expressions_AttributeReference quickstep_queryoptimizer_expressions_ComparisonExpression quickstep_queryoptimizer_expressions_ExprId @@ -62,6 +66,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod quickstep_queryoptimizer_expressions_PatternMatcher quickstep_queryoptimizer_expressions_Predicate quickstep_queryoptimizer_physical_Aggregate + quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate quickstep_queryoptimizer_physical_FilterJoin quickstep_queryoptimizer_physical_HashJoin quickstep_queryoptimizer_physical_NestedLoopsJoin @@ -76,7 +81,9 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod quickstep_queryoptimizer_physical_TopLevelPlan quickstep_queryoptimizer_physical_WindowAggregate quickstep_types_NullType + quickstep_types_TypeID quickstep_types_TypedValue + quickstep_utility_EqualsAnyConstant quickstep_utility_Macros) # Module all-in-one library: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/SimpleCostModel.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp index e9d2e3a..cfd8a75 100644 --- a/query_optimizer/cost_model/SimpleCostModel.cpp +++ b/query_optimizer/cost_model/SimpleCostModel.cpp @@ -26,6 +26,7 @@ #include "catalog/CatalogRelationStatistics.hpp" #include "query_optimizer/cost_model/CostModel.hpp" #include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" #include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" @@ -74,6 +75,9 @@ std::size_t SimpleCostModel::estimateCardinality( case P::PhysicalType::kAggregate: return estimateCardinalityForAggregate( std::static_pointer_cast<const P::Aggregate>(physical_plan)); + case P::PhysicalType::kCrossReferenceCoalesceAggregate: + return estimateCardinalityForCrossReferenceCoalesceAggregate( + std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan)); case P::PhysicalType::kSharedSubplanReference: { const P::SharedSubplanReferencePtr shared_subplan_reference = std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan); @@ -149,6 +153,11 @@ std::size_t SimpleCostModel::estimateCardinalityForAggregate( estimateCardinality(physical_plan->input()) / 10); } +std::size_t SimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate( + const physical::CrossReferenceCoalesceAggregatePtr &physical_plan) { + return estimateCardinality(physical_plan->left_child()); +} + std::size_t SimpleCostModel::estimateCardinalityForWindowAggregate( const physical::WindowAggregatePtr &physical_plan) { return estimateCardinality(physical_plan->input()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/SimpleCostModel.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp index 4edc2fe..0660c37 100644 --- a/query_optimizer/cost_model/SimpleCostModel.hpp +++ b/query_optimizer/cost_model/SimpleCostModel.hpp @@ -25,6 +25,7 @@ #include "query_optimizer/cost_model/CostModel.hpp" #include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" #include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" @@ -100,6 +101,10 @@ class SimpleCostModel : public CostModel { std::size_t estimateCardinalityForAggregate( const physical::AggregatePtr &physical_plan); + // Returns the cardinality of the left child plan. + std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate( + const physical::CrossReferenceCoalesceAggregatePtr &physical_plan); + // Return the estimated cardinality of the input plan. std::size_t estimateCardinalityForWindowAggregate( const physical::WindowAggregatePtr &physical_plan); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp index 7afa1c3..f546737 100644 --- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp +++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp @@ -20,13 +20,18 @@ #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" #include <algorithm> +#include <cstddef> +#include <cstdint> #include <memory> #include <vector> #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogRelationStatistics.hpp" #include "catalog/CatalogTypedefs.hpp" +#include "expressions/aggregation/AggregateFunction.hpp" +#include "expressions/aggregation/AggregationID.hpp" #include "query_optimizer/cost_model/CostModel.hpp" +#include "query_optimizer/expressions/AggregateFunction.hpp" #include "query_optimizer/expressions/AttributeReference.hpp" #include "query_optimizer/expressions/ComparisonExpression.hpp" #include "query_optimizer/expressions/ExprId.hpp" @@ -37,6 +42,7 @@ #include "query_optimizer/expressions/Predicate.hpp" #include "query_optimizer/expressions/PatternMatcher.hpp" #include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" #include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" @@ -49,8 +55,12 @@ #include "query_optimizer/physical/TableGenerator.hpp" #include "query_optimizer/physical/TableReference.hpp" #include "query_optimizer/physical/TopLevelPlan.hpp" +#include "types/TypeID.hpp" #include "types/TypedValue.hpp" #include "types/NullType.hpp" +#include "utility/EqualsAnyConstant.hpp" + +#include "gflags/gflags.h" #include "glog/logging.h" @@ -58,6 +68,10 @@ namespace quickstep { namespace optimizer { namespace cost { +DEFINE_int64(collision_free_vector_table_max_size, 1000000000, + "The maximum allowed key range (number of entries) for using a " + "CollisionFreeVectorTable."); + namespace E = ::quickstep::optimizer::expressions; namespace P = ::quickstep::optimizer::physical; @@ -88,6 +102,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality( case P::PhysicalType::kAggregate: return estimateCardinalityForAggregate( std::static_pointer_cast<const P::Aggregate>(physical_plan)); + case P::PhysicalType::kCrossReferenceCoalesceAggregate: + return estimateCardinalityForCrossReferenceCoalesceAggregate( + std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan)); case P::PhysicalType::kSharedSubplanReference: { const P::SharedSubplanReferencePtr shared_subplan_reference = std::static_pointer_cast<const P::SharedSubplanReference>(physical_plan); @@ -175,6 +192,11 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForAggregate( estimateNumGroupsForAggregate(physical_plan) * filter_selectivity); } +std::size_t StarSchemaSimpleCostModel::estimateCardinalityForCrossReferenceCoalesceAggregate( + const P::CrossReferenceCoalesceAggregatePtr &physical_plan) { + return estimateCardinality(physical_plan->left_child()); +} + std::size_t StarSchemaSimpleCostModel::estimateCardinalityForWindowAggregate( const P::WindowAggregatePtr &physical_plan) { return estimateCardinality(physical_plan->input()); @@ -233,6 +255,13 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues( } break; } + case P::PhysicalType::kCrossReferenceCoalesceAggregate: { + const P::PhysicalPtr left_child = physical_plan->children()[0]; + if (E::ContainsExprId(left_child->getOutputAttributes(), attribute_id)) { + return estimateNumDistinctValues(attribute_id, left_child); + } + break; + } case P::PhysicalType::kFilterJoin: { const P::FilterJoinPtr &filter_join = std::static_pointer_cast<const P::FilterJoin>(physical_plan); @@ -275,6 +304,17 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues( double StarSchemaSimpleCostModel::estimateSelectivity( const physical::PhysicalPtr &physical_plan) { switch (physical_plan->getPhysicalType()) { + case P::PhysicalType::kAggregate: { + const P::AggregatePtr &aggregate = + std::static_pointer_cast<const P::Aggregate>(physical_plan); + return estimateSelectivity(aggregate->input()) * + estimateSelectivityForFilterPredicate(aggregate); + } + case P::PhysicalType::kCrossReferenceCoalesceAggregate: { + const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join = + std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan); + return estimateSelectivity(aggregate_on_left_outer_join->left_child()); + } case P::PhysicalType::kSelection: { const P::SelectionPtr &selection = std::static_pointer_cast<const P::Selection>(physical_plan); @@ -331,6 +371,7 @@ double StarSchemaSimpleCostModel::estimateSelectivity( double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate( const physical::PhysicalPtr &physical_plan) { + P::PhysicalPtr target_plan = physical_plan; E::PredicatePtr filter_predicate = nullptr; switch (physical_plan->getPhysicalType()) { case P::PhysicalType::kSelection: @@ -340,6 +381,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate( case P::PhysicalType::kAggregate: filter_predicate = std::static_pointer_cast<const P::Aggregate>(physical_plan)->filter_predicate(); + target_plan = physical_plan->children()[0]; break; case P::PhysicalType::kHashJoin: filter_predicate = @@ -356,7 +398,7 @@ double StarSchemaSimpleCostModel::estimateSelectivityForFilterPredicate( if (filter_predicate == nullptr) { return 1.0; } else { - return estimateSelectivityForPredicate(filter_predicate, physical_plan); + return estimateSelectivityForPredicate(filter_predicate, target_plan); } } @@ -443,6 +485,12 @@ bool StarSchemaSimpleCostModel::impliesUniqueAttributes( std::static_pointer_cast<const P::Aggregate>(physical_plan); return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes); } + case P::PhysicalType::kCrossReferenceCoalesceAggregate: { + const P::CrossReferenceCoalesceAggregatePtr &aggregate_on_left_outer_join = + std::static_pointer_cast<const P::CrossReferenceCoalesceAggregate>(physical_plan); + return E::SubsetOfExpressions( + aggregate_on_left_outer_join->left_join_attributes(), attributes); + } case P::PhysicalType::kHashJoin: { const P::HashJoinPtr &hash_join = std::static_pointer_cast<const P::HashJoin>(physical_plan); @@ -542,6 +590,103 @@ attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId( return kInvalidAttributeID; } +bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation( + const P::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *max_num_groups) { +#ifdef QUICKSTEP_DISTRIBUTED + // Currently we cannot do this fast path with the distributed setting. See + // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and + // FinalizeAggregationOperator::getAllWorkOrderProtos(). + return false; +#endif + + // Supports only single group-by key. + if (aggregate->grouping_expressions().size() != 1) { + return false; + } + + // We need to know the exact min/max stats of the group-by key. + // So it must be a CatalogAttribute (but not an expression). + E::AttributeReferencePtr group_by_key_attr; + const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front(); + if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) { + return false; + } + + bool min_value_stat_is_exact; + bool max_value_stat_is_exact; + const TypedValue min_value = findMinValueStat( + aggregate, group_by_key_attr, &min_value_stat_is_exact); + const TypedValue max_value = findMaxValueStat( + aggregate, group_by_key_attr, &max_value_stat_is_exact); + if (min_value.isNull() || max_value.isNull() || + (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) { + return false; + } + + std::int64_t min_cpp_value; + std::int64_t max_cpp_value; + switch (group_by_key_attr->getValueType().getTypeID()) { + case TypeID::kInt: { + min_cpp_value = min_value.getLiteral<int>(); + max_cpp_value = max_value.getLiteral<int>(); + break; + } + case TypeID::kLong: { + min_cpp_value = min_value.getLiteral<std::int64_t>(); + max_cpp_value = max_value.getLiteral<std::int64_t>(); + break; + } + default: + return false; + } + + // TODO(jianqiao): + // 1. Handle the case where min_cpp_value is below 0 or far greater than 0. + // 2. Reason about the table size bound (e.g. by checking memory size) instead + // of hardcoding it as a gflag. + if (min_cpp_value < 0 || + max_cpp_value >= FLAGS_collision_free_vector_table_max_size || + max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) { + return false; + } + + for (const auto &agg_expr : aggregate->aggregate_expressions()) { + const E::AggregateFunctionPtr agg_func = + std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression()); + + if (agg_func->is_distinct()) { + return false; + } + + // TODO(jianqiao): Support AggregationID::AVG. + if (!QUICKSTEP_EQUALS_ANY_CONSTANT(agg_func->getAggregate().getAggregationID(), + AggregationID::kCount, + AggregationID::kSum)) { + return false; + } + + const auto &arguments = agg_func->getArguments(); + if (arguments.size() > 1u) { + return false; + } + + if (arguments.size() == 1u) { + if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(), + TypeID::kInt, + TypeID::kLong, + TypeID::kFloat, + TypeID::kDouble)) { + return false; + } + } + } + + *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1; + return true; +} + } // namespace cost } // namespace optimizer } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp index cbe18f4..afb2ef9 100644 --- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp +++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp @@ -29,6 +29,7 @@ #include "query_optimizer/expressions/ExprId.hpp" #include "query_optimizer/expressions/Predicate.hpp" #include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" #include "query_optimizer/physical/NestedLoopsJoin.hpp" #include "query_optimizer/physical/FilterJoin.hpp" #include "query_optimizer/physical/HashJoin.hpp" @@ -166,10 +167,29 @@ class StarSchemaSimpleCostModel : public CostModel { physical_plan, attribute->id(), StatType::kMax, is_exact_stat); } + /** + * @brief Checks whether an aggregate node can be efficiently evaluated with + * the collision-free aggregation fast path. + * + * @param aggregate The physical aggregate node to be checked. + * @param estimated_num_groups The estimated number of groups for the aggregate. + * @param exact_num_groups If collision-free aggregation is applicable, the + * pointed content of this pointer will be set as the maximum possible + * number of groups that the collision-free hash table need to hold. + * @return A bool value indicating whether collision-free aggregation can be + * used to evaluate \p aggregate. + */ + bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate, + const std::size_t estimated_num_groups, + std::size_t *max_num_groups); + private: std::size_t estimateCardinalityForAggregate( const physical::AggregatePtr &physical_plan); + std::size_t estimateCardinalityForCrossReferenceCoalesceAggregate( + const physical::CrossReferenceCoalesceAggregatePtr &physical_plan); + std::size_t estimateCardinalityForFilterJoin( const physical::FilterJoinPtr &physical_plan); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt index f68ed39..77ae75e 100644 --- a/query_optimizer/physical/CMakeLists.txt +++ b/query_optimizer/physical/CMakeLists.txt @@ -21,6 +21,9 @@ add_library(quickstep_queryoptimizer_physical_BinaryJoin BinaryJoin.cpp BinaryJo add_library(quickstep_queryoptimizer_physical_CopyFrom CopyFrom.cpp CopyFrom.hpp) add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp CreateIndex.hpp) add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp) +add_library(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate + CrossReferenceCoalesceAggregate.cpp + CrossReferenceCoalesceAggregate.hpp) add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp) add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp) add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp) @@ -95,6 +98,16 @@ target_link_libraries(quickstep_queryoptimizer_physical_CreateTable quickstep_queryoptimizer_physical_PhysicalType quickstep_utility_Cast quickstep_utility_Macros) +target_link_libraries(quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate + quickstep_queryoptimizer_OptimizerTree + quickstep_queryoptimizer_expressions_Alias + quickstep_queryoptimizer_expressions_AttributeReference + quickstep_queryoptimizer_expressions_ExpressionUtil + quickstep_queryoptimizer_expressions_Predicate + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_physical_PhysicalType + quickstep_utility_Cast + quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_physical_DeleteTuples glog quickstep_catalog_CatalogRelation @@ -293,6 +306,7 @@ target_link_libraries(quickstep_queryoptimizer_physical quickstep_queryoptimizer_physical_CopyFrom quickstep_queryoptimizer_physical_CreateIndex quickstep_queryoptimizer_physical_CreateTable + quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate quickstep_queryoptimizer_physical_DeleteTuples quickstep_queryoptimizer_physical_DropTable quickstep_queryoptimizer_physical_FilterJoin http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp new file mode 100644 index 0000000..16700c0 --- /dev/null +++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.cpp @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" + +#include <string> +#include <unordered_set> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "utility/Cast.hpp" + +namespace quickstep { +namespace optimizer { +namespace physical { + +namespace E = ::quickstep::optimizer::expressions; + +std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate + ::getOutputAttributes() const { + std::vector<E::AttributeReferencePtr> output_attributes(left_join_attributes_); + for (const auto &aggregate_expr : aggregate_expressions_) { + output_attributes.emplace_back(E::ToRef(aggregate_expr)); + } + return output_attributes; +} + +std::vector<E::AttributeReferencePtr> CrossReferenceCoalesceAggregate + ::getReferencedAttributes() const { + std::unordered_set<E::AttributeReferencePtr> referenced_attributes; + + referenced_attributes.insert(left_join_attributes_.begin(), + left_join_attributes_.end()); + referenced_attributes.insert(right_join_attributes_.begin(), + right_join_attributes_.end()); + + if (right_filter_predicate_ != nullptr) { + const std::vector<E::AttributeReferencePtr> attrs_in_predicate = + right_filter_predicate_->getReferencedAttributes(); + referenced_attributes.insert(attrs_in_predicate.begin(), + attrs_in_predicate.end()); + } + + for (const auto &aggregate_expr : aggregate_expressions_) { + const std::vector<E::AttributeReferencePtr> attrs_in_expr = + aggregate_expr->getReferencedAttributes(); + referenced_attributes.insert(attrs_in_expr.begin(), attrs_in_expr.end()); + } + + return std::vector<E::AttributeReferencePtr>( + referenced_attributes.begin(), referenced_attributes.end()); +} + +void CrossReferenceCoalesceAggregate::getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const { + inline_field_names->push_back("hash_table_num_entries"); + inline_field_values->push_back(std::to_string(hash_table_num_entries_)); + + non_container_child_field_names->push_back("left_child"); + non_container_child_fields->push_back(left_child_); + non_container_child_field_names->push_back("right_child"); + non_container_child_fields->push_back(right_child_); + + container_child_field_names->push_back("left_join_attributes"); + container_child_fields->push_back( + CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_)); + container_child_field_names->push_back("right_join_attributes"); + container_child_fields->push_back( + CastSharedPtrVector<OptimizerTreeBase>(right_join_attributes_)); + + if (right_filter_predicate_ != nullptr) { + non_container_child_field_names->push_back("right_filter_predicate"); + non_container_child_fields->push_back(right_filter_predicate_); + } + container_child_field_names->push_back("aggregate_expressions"); + container_child_fields->push_back( + CastSharedPtrVector<OptimizerTreeBase>(aggregate_expressions_)); +} + +} // namespace physical +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp new file mode 100644 index 0000000..9fb02e2 --- /dev/null +++ b/query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_ + +#include <memory> +#include <string> +#include <vector> + +#include "query_optimizer/OptimizerTree.hpp" +#include "query_optimizer/expressions/Alias.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/expressions/Predicate.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { +namespace physical { + +/** \addtogroup OptimizerLogical + * @{ + */ + +class CrossReferenceCoalesceAggregate; +typedef std::shared_ptr<const CrossReferenceCoalesceAggregate> CrossReferenceCoalesceAggregatePtr; + +/** + * @brief TODO. + */ +class CrossReferenceCoalesceAggregate : public Physical { + public: + PhysicalType getPhysicalType() const override { + return PhysicalType::kCrossReferenceCoalesceAggregate; + } + + std::string getName() const override { + return "CrossReferenceCoalesceAggregate"; + } + + const PhysicalPtr& left_child() const { + return left_child_; + } + + const PhysicalPtr& right_child() const { + return right_child_; + } + + const std::vector<expressions::AttributeReferencePtr>& left_join_attributes() const { + return left_join_attributes_; + } + + const std::vector<expressions::AttributeReferencePtr>& right_join_attributes() const { + return right_join_attributes_; + } + + const expressions::PredicatePtr& right_filter_predicate() const { + return right_filter_predicate_; + } + + const std::vector<expressions::AliasPtr>& aggregate_expressions() const { + return aggregate_expressions_; + } + + inline std::size_t hash_table_num_entries() const { + return hash_table_num_entries_; + } + + PhysicalPtr copyWithNewChildren( + const std::vector<PhysicalPtr> &new_children) const override { + DCHECK_EQ(getNumChildren(), new_children.size()); + return Create(new_children[0], + new_children[1], + left_join_attributes_, + right_join_attributes_, + right_filter_predicate_, + aggregate_expressions_, + hash_table_num_entries_); + } + + std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override; + + std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override; + + bool maybeCopyWithPrunedExpressions( + const expressions::UnorderedNamedExpressionSet &referenced_expressions, + PhysicalPtr *output) const override { + return false; + } + + static CrossReferenceCoalesceAggregatePtr Create( + const PhysicalPtr &left_child, + const PhysicalPtr &right_child, + const std::vector<expressions::AttributeReferencePtr> &left_join_attributes, + const std::vector<expressions::AttributeReferencePtr> &right_join_attributes, + const expressions::PredicatePtr right_filter_predicate, + const std::vector<expressions::AliasPtr> &aggregate_expressions, + const std::size_t hash_table_num_entries) { + return CrossReferenceCoalesceAggregatePtr( + new CrossReferenceCoalesceAggregate(left_child, + right_child, + left_join_attributes, + right_join_attributes, + right_filter_predicate, + aggregate_expressions, + hash_table_num_entries)); + } + + protected: + void getFieldStringItems( + std::vector<std::string> *inline_field_names, + std::vector<std::string> *inline_field_values, + std::vector<std::string> *non_container_child_field_names, + std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields, + std::vector<std::string> *container_child_field_names, + std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override; + + private: + CrossReferenceCoalesceAggregate( + const PhysicalPtr &left_child, + const PhysicalPtr &right_child, + const std::vector<expressions::AttributeReferencePtr> &left_join_attributes, + const std::vector<expressions::AttributeReferencePtr> &right_join_attributes, + const expressions::PredicatePtr right_filter_predicate, + const std::vector<expressions::AliasPtr> &aggregate_expressions, + const std::size_t hash_table_num_entries) + : left_child_(left_child), + right_child_(right_child), + left_join_attributes_(left_join_attributes), + right_join_attributes_(right_join_attributes), + right_filter_predicate_(right_filter_predicate), + aggregate_expressions_(aggregate_expressions), + hash_table_num_entries_(hash_table_num_entries) { + addChild(left_child_); + addChild(right_child_); + } + + PhysicalPtr left_child_; + PhysicalPtr right_child_; + std::vector<expressions::AttributeReferencePtr> left_join_attributes_; + std::vector<expressions::AttributeReferencePtr> right_join_attributes_; + expressions::PredicatePtr right_filter_predicate_; + std::vector<expressions::AliasPtr> aggregate_expressions_; + std::size_t hash_table_num_entries_; + + DISALLOW_COPY_AND_ASSIGN(CrossReferenceCoalesceAggregate); +}; + +/** @} */ + +} // namespace physical +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_CROSS_REFERENCE_COALESCE_AGGREGATE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/PatternMatcher.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp index 4336767..0204504 100644 --- a/query_optimizer/physical/PatternMatcher.hpp +++ b/query_optimizer/physical/PatternMatcher.hpp @@ -33,6 +33,7 @@ class Aggregate; class BinaryJoin; class CopyFrom; class CreateTable; +class CrossReferenceCoalesceAggregate; class DeleteTuples; class DropTable; class FilterJoin; @@ -112,6 +113,8 @@ using SomeAggregate = SomePhysicalNode<Aggregate, PhysicalType::kAggregate>; using SomeBinaryJoin = SomePhysicalNode<BinaryJoin, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>; using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>; using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>; +using SomeCrossReferenceCoalesceAggregate = SomePhysicalNode<CrossReferenceCoalesceAggregate, + PhysicalType::kCrossReferenceCoalesceAggregate>; using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>; using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>; using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/physical/PhysicalType.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp index 1da5929..077bd54 100644 --- a/query_optimizer/physical/PhysicalType.hpp +++ b/query_optimizer/physical/PhysicalType.hpp @@ -36,6 +36,7 @@ enum class PhysicalType { kCopyFrom, kCreateIndex, kCreateTable, + kCrossReferenceCoalesceAggregate, kDeleteTuples, kDropTable, kFilterJoin, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/BottomUpRule.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/BottomUpRule.hpp b/query_optimizer/rules/BottomUpRule.hpp index 53dff0d..6c14e64 100644 --- a/query_optimizer/rules/BottomUpRule.hpp +++ b/query_optimizer/rules/BottomUpRule.hpp @@ -57,21 +57,7 @@ class BottomUpRule : public Rule<TreeType> { DCHECK(tree != nullptr); init(tree); - std::vector<std::shared_ptr<const TreeType>> new_children; - bool has_changed_children = false; - for (const std::shared_ptr<const TreeType> &child : tree->children()) { - std::shared_ptr<const TreeType> new_child = apply(child); - if (child != new_child && !has_changed_children) { - has_changed_children = true; - } - new_children.push_back(new_child); - } - - if (has_changed_children) { - return applyToNode(tree->copyWithNewChildren(new_children)); - } else { - return applyToNode(tree); - } + return applyInternal(tree); } protected: @@ -89,10 +75,29 @@ class BottomUpRule : public Rule<TreeType> { * * @param input The input tree. */ - virtual void init(const TreeNodePtr &input) { - } + virtual void init(const TreeNodePtr &input) {} private: + TreeNodePtr applyInternal(const TreeNodePtr &tree) { + DCHECK(tree != nullptr); + + std::vector<std::shared_ptr<const TreeType>> new_children; + bool has_changed_children = false; + for (const std::shared_ptr<const TreeType> &child : tree->children()) { + std::shared_ptr<const TreeType> new_child = applyInternal(child); + if (child != new_child && !has_changed_children) { + has_changed_children = true; + } + new_children.push_back(new_child); + } + + if (has_changed_children) { + return applyToNode(tree->copyWithNewChildren(new_children)); + } else { + return applyToNode(tree); + } + } + DISALLOW_COPY_AND_ASSIGN(BottomUpRule); }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt index 029d816..a614bde 100644 --- a/query_optimizer/rules/CMakeLists.txt +++ b/query_optimizer/rules/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(tests) add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp) add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp) add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp) +add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp) add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp) add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp) add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp) @@ -75,6 +76,27 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject quickstep_queryoptimizer_rules_Rule quickstep_queryoptimizer_rules_RuleHelper quickstep_utility_Macros) +target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin + quickstep_expressions_aggregation_AggregateFunction + quickstep_expressions_aggregation_AggregationID + quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel + quickstep_queryoptimizer_expressions_AggregateFunction + quickstep_queryoptimizer_expressions_Alias + quickstep_queryoptimizer_expressions_AttributeReference + quickstep_queryoptimizer_expressions_ExpressionUtil + quickstep_queryoptimizer_expressions_Predicate + quickstep_queryoptimizer_expressions_NamedExpression + quickstep_queryoptimizer_expressions_PatternMatcher + quickstep_queryoptimizer_physical_Aggregate + quickstep_queryoptimizer_physical_CrossReferenceCoalesceAggregate + quickstep_queryoptimizer_physical_HashJoin + quickstep_queryoptimizer_physical_PatternMatcher + quickstep_queryoptimizer_physical_Physical + quickstep_queryoptimizer_physical_PhysicalType + quickstep_queryoptimizer_physical_Selection + quickstep_queryoptimizer_physical_TopLevelPlan + quickstep_queryoptimizer_rules_BottomUpRule + quickstep_utility_Macros) target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins glog quickstep_queryoptimizer_expressions_AttributeReference @@ -288,6 +310,7 @@ target_link_libraries(quickstep_queryoptimizer_rules quickstep_queryoptimizer_rules_AttachLIPFilters quickstep_queryoptimizer_rules_BottomUpRule quickstep_queryoptimizer_rules_CollapseProject + quickstep_queryoptimizer_rules_FuseAggregateJoin quickstep_queryoptimizer_rules_GenerateJoins quickstep_queryoptimizer_rules_InjectJoinFilters quickstep_queryoptimizer_rules_PruneColumns http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/FuseAggregateJoin.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/FuseAggregateJoin.cpp b/query_optimizer/rules/FuseAggregateJoin.cpp new file mode 100644 index 0000000..206188e --- /dev/null +++ b/query_optimizer/rules/FuseAggregateJoin.cpp @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_optimizer/rules/FuseAggregateJoin.hpp" + +#include <algorithm> +#include <cstddef> +#include <unordered_set> +#include <vector> + +#include "expressions/aggregation/AggregateFunction.hpp" +#include "expressions/aggregation/AggregationID.hpp" +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/expressions/AggregateFunction.hpp" +#include "query_optimizer/expressions/Alias.hpp" +#include "query_optimizer/expressions/AttributeReference.hpp" +#include "query_optimizer/expressions/ExpressionUtil.hpp" +#include "query_optimizer/expressions/NamedExpression.hpp" +#include "query_optimizer/expressions/PatternMatcher.hpp" +#include "query_optimizer/expressions/Predicate.hpp" +#include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/CrossReferenceCoalesceAggregate.hpp" +#include "query_optimizer/physical/HashJoin.hpp" +#include "query_optimizer/physical/PatternMatcher.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/physical/PhysicalType.hpp" +#include "query_optimizer/physical/Selection.hpp" +#include "query_optimizer/physical/TopLevelPlan.hpp" + +#include "glog/logging.h" + +namespace quickstep { +namespace optimizer { + +namespace E = ::quickstep::optimizer::expressions; +namespace P = ::quickstep::optimizer::physical; + +P::PhysicalPtr FuseAggregateJoin::applyToNode( + const P::PhysicalPtr &node) { + P::AggregatePtr aggregate; + if (!P::SomeAggregate::MatchesWithConditionalCast(node, &aggregate) || + aggregate->filter_predicate() != nullptr) { + return node; + } + + P::HashJoinPtr hash_join; + if ((!P::SomeHashJoin::MatchesWithConditionalCast(aggregate->input(), &hash_join)) || + hash_join->join_type() != P::HashJoin::JoinType::kLeftOuterJoin || + hash_join->residual_predicate() != nullptr) { + return node; + } + + const std::vector<E::AttributeReferencePtr> &left_join_attributes = + hash_join->left_join_attributes(); + if (left_join_attributes.size() != 1u || + (!cost_model_->impliesUniqueAttributes(hash_join->left(), left_join_attributes))) { + return node; + } + + const std::vector<E::NamedExpressionPtr> &grouping_expressions = + aggregate->grouping_expressions(); + if (grouping_expressions.size() != 1u || + grouping_expressions.front()->id() != left_join_attributes.front()->id()) { + return node; + } + + std::unordered_set<E::ExprId> right_side_attr_ids; + for (const auto &attr : hash_join->right()->getOutputAttributes()) { + right_side_attr_ids.insert(attr->id()); + } + + const std::vector<E::AliasPtr> &aggregate_expressions = + aggregate->aggregate_expressions(); + for (const auto &expr : aggregate_expressions) { + const E::AggregateFunctionPtr aggr_expr = + std::static_pointer_cast<const E::AggregateFunction>(expr->expression()); + + const std::vector<E::ScalarPtr> &arguments = aggr_expr->getArguments(); + if (arguments.size() != 1u) { + return node; + } + + E::AttributeReferencePtr arg_attr; + if (!E::SomeAttributeReference::MatchesWithConditionalCast(arguments.front(), &arg_attr) || + right_side_attr_ids.find(arg_attr->id()) == right_side_attr_ids.end()) { + return node; + } + } + + const std::size_t estimated_num_groups = + cost_model_->estimateNumGroupsForAggregate(aggregate); + + std::size_t max_num_groups_left; + if (!cost_model_->canUseCollisionFreeAggregation(aggregate, + estimated_num_groups, + &max_num_groups_left)) { + return node; + } + + std::size_t max_num_groups_right; + if (!cost_model_->canUseCollisionFreeAggregation( + P::Aggregate::Create(hash_join->right(), + E::ToNamedExpressions(hash_join->right_join_attributes()), + aggregate->aggregate_expressions(), + nullptr), + estimated_num_groups, + &max_num_groups_right)) { + return node; + } + + // Fuse filter predicate. + P::PhysicalPtr right_child = hash_join->right(); + const std::vector<E::AttributeReferencePtr> &right_join_attributes = + hash_join->right_join_attributes(); + E::PredicatePtr right_filter_predicate = nullptr; + + P::SelectionPtr selection; + if (P::SomeSelection::MatchesWithConditionalCast(right_child, &selection)) { + if (E::SubsetOfExpressions(right_join_attributes, + selection->input()->getOutputAttributes())) { + right_child = selection->input(); + right_filter_predicate = selection->filter_predicate(); + } + } + + const std::size_t max_num_groups = + std::max(max_num_groups_left, max_num_groups_right); + + return P::CrossReferenceCoalesceAggregate::Create(hash_join->left(), + right_child, + left_join_attributes, + right_join_attributes, + right_filter_predicate, + aggregate_expressions, + max_num_groups); +} + +void FuseAggregateJoin::init(const P::PhysicalPtr &input) { + DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan); + + const P::TopLevelPlanPtr top_level_plan = + std::static_pointer_cast<const P::TopLevelPlan>(input); + cost_model_.reset( + new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans())); +} + +} // namespace optimizer +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/query_optimizer/rules/FuseAggregateJoin.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/rules/FuseAggregateJoin.hpp b/query_optimizer/rules/FuseAggregateJoin.hpp new file mode 100644 index 0000000..240aaf1 --- /dev/null +++ b/query_optimizer/rules/FuseAggregateJoin.hpp @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_ +#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_ + +#include <cstddef> +#include <memory> +#include <string> + +#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp" +#include "query_optimizer/physical/Aggregate.hpp" +#include "query_optimizer/physical/Physical.hpp" +#include "query_optimizer/rules/BottomUpRule.hpp" +#include "utility/Macros.hpp" + +namespace quickstep { +namespace optimizer { + +/** \addtogroup OptimizerRules + * @{ + */ + +class FuseAggregateJoin : public BottomUpRule<physical::Physical> { + public: + /** + * @brief Constructor. + */ + FuseAggregateJoin() {} + + ~FuseAggregateJoin() override {} + + std::string getName() const override { + return "FuseAggregateJoin"; + } + + protected: + physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &node) override; + + void init(const physical::PhysicalPtr &input) override; + + private: + std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_; + + DISALLOW_COPY_AND_ASSIGN(FuseAggregateJoin); +}; + +/** @} */ + +} // namespace optimizer +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_AGGREGATE_JOIN_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/BuildAggregationExistenceMapOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildAggregationExistenceMapOperator.cpp b/relational_operators/BuildAggregationExistenceMapOperator.cpp new file mode 100644 index 0000000..6a77696 --- /dev/null +++ b/relational_operators/BuildAggregationExistenceMapOperator.cpp @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "relational_operators/BuildAggregationExistenceMapOperator.hpp" + +#include <vector> + +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.hpp" +#include "query_execution/WorkOrderProtosContainer.hpp" +#include "query_execution/WorkOrdersContainer.hpp" +#include "relational_operators/WorkOrder.pb.h" +#include "storage/AggregationOperationState.hpp" +#include "storage/StorageBlock.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" +#include "storage/ValueAccessor.hpp" +#include "storage/ValueAccessorUtil.hpp" +#include "types/Type.hpp" +#include "types/TypeID.hpp" +#include "utility/BarrieredReadWriteConcurrentBitVector.hpp" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +namespace { + +template <typename CppType, bool is_attr_nullable> +void ExecuteBuild(const attribute_id attr_id, + ValueAccessor *accessor, + BarrieredReadWriteConcurrentBitVector *existence_map) { + InvokeOnAnyValueAccessor( + accessor, + [&](auto *accessor) -> void { // NOLINT(build/c++11) + accessor->beginIteration(); + while (accessor->next()) { + const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id); + if (!is_attr_nullable || value != nullptr) { + existence_map->setBit(*reinterpret_cast<const CppType *>(value)); + } + } + }); +} + +template <typename CppType> +void ExecuteHelper(const attribute_id attr_id, + const bool is_attr_nullable, + ValueAccessor *accessor, + BarrieredReadWriteConcurrentBitVector *existence_map) { + if (is_attr_nullable) { + ExecuteBuild<CppType, true>(attr_id, accessor, existence_map); + } else { + ExecuteBuild<CppType, false>(attr_id, accessor, existence_map); + } +} + +} + +bool BuildAggregationExistenceMapOperator::getAllWorkOrders( + WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) { + if (input_relation_is_stored_) { + if (!started_) { + for (const block_id input_block_id : input_relation_block_ids_) { + container->addNormalWorkOrder( + new BuildAggregationExistenceMapWorkOrder( + query_id_, + input_relation_, + input_block_id, + build_attribute_, + query_context->getAggregationState(aggr_state_index_), + storage_manager), + op_index_); + } + started_ = true; + } + return true; + } else { + while (num_workorders_generated_ < input_relation_block_ids_.size()) { + container->addNormalWorkOrder( + new BuildAggregationExistenceMapWorkOrder( + query_id_, + input_relation_, + input_relation_block_ids_[num_workorders_generated_], + build_attribute_, + query_context->getAggregationState(aggr_state_index_), + storage_manager), + op_index_); + ++num_workorders_generated_; + } + return done_feeding_input_relation_; + } +} + +bool BuildAggregationExistenceMapOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { + LOG(FATAL) << "Not implemented"; +} + +void BuildAggregationExistenceMapWorkOrder::execute() { + BlockReference block( + storage_manager_->getBlock(build_block_id_, input_relation_)); + + std::unique_ptr<ValueAccessor> accessor( + block->getTupleStorageSubBlock().createValueAccessor()); + + const Type &attr_type = + input_relation_.getAttributeById(build_attribute_)->getType(); + switch (attr_type.getTypeID()) { + case TypeID::kInt: + ExecuteHelper<int>(build_attribute_, + attr_type.isNullable(), + accessor.get(), + state_->getExistenceMap()); + return; + case TypeID::kLong: + ExecuteHelper<std::int64_t>(build_attribute_, + attr_type.isNullable(), + accessor.get(), + state_->getExistenceMap()); + return; + default: + LOG(FATAL) << "Build attribute type not supported by " + << "BuildAggregationExistenceMapOperator: " + << attr_type.getName(); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccd707b3/relational_operators/BuildAggregationExistenceMapOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp new file mode 100644 index 0000000..2d13bda --- /dev/null +++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_ +#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_ + +#include <string> +#include <vector> + +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/QueryContext.hpp" +#include "relational_operators/RelationalOperator.hpp" +#include "relational_operators/WorkOrder.hpp" +#include "storage/StorageBlockInfo.hpp" +#include "utility/Macros.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class AggregationOperationState; +class CatalogRelationSchema; +class StorageManager; +class WorkOrderProtosContainer; +class WorkOrdersContainer; + +namespace serialization { class WorkOrder; } + +/** \addtogroup RelationalOperators + * @{ + */ + +class BuildAggregationExistenceMapOperator : public RelationalOperator { + public: + BuildAggregationExistenceMapOperator(const std::size_t query_id, + const CatalogRelation &input_relation, + const attribute_id build_attribute, + const bool input_relation_is_stored, + const QueryContext::aggregation_state_id aggr_state_index) + : RelationalOperator(query_id), + input_relation_(input_relation), + build_attribute_(build_attribute), + input_relation_is_stored_(input_relation_is_stored), + input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot() + : std::vector<block_id>()), + aggr_state_index_(aggr_state_index), + num_workorders_generated_(0), + started_(false) {} + + ~BuildAggregationExistenceMapOperator() override {} + + std::string getName() const override { + return "BuildAggregationExistenceMapOperator"; + } + + const CatalogRelation& input_relation() const { + return input_relation_; + } + + bool getAllWorkOrders(WorkOrdersContainer *container, + QueryContext *query_context, + StorageManager *storage_manager, + const tmb::client_id scheduler_client_id, + tmb::MessageBus *bus) override; + + bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override; + + void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, + const partition_id part_id) override { + input_relation_block_ids_.push_back(input_block_id); + } + + private: + serialization::WorkOrder* createWorkOrderProto(const block_id block); + + const CatalogRelation &input_relation_; + const attribute_id build_attribute_; + const bool input_relation_is_stored_; + std::vector<block_id> input_relation_block_ids_; + const QueryContext::aggregation_state_id aggr_state_index_; + + std::vector<block_id>::size_type num_workorders_generated_; + bool started_; + + DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapOperator); +}; + +/** + * @brief A WorkOrder produced by BuildAggregationExistenceMapOperator. + **/ +class BuildAggregationExistenceMapWorkOrder : public WorkOrder { + public: + BuildAggregationExistenceMapWorkOrder(const std::size_t query_id, + const CatalogRelationSchema &input_relation, + const block_id build_block_id, + const attribute_id build_attribute, + AggregationOperationState *state, + StorageManager *storage_manager) + : WorkOrder(query_id), + input_relation_(input_relation), + build_block_id_(build_block_id), + build_attribute_(build_attribute), + state_(DCHECK_NOTNULL(state)), + storage_manager_(DCHECK_NOTNULL(storage_manager)) {} + + ~BuildAggregationExistenceMapWorkOrder() override {} + + void execute() override; + + private: + const CatalogRelationSchema &input_relation_; + const block_id build_block_id_; + const attribute_id build_attribute_; + AggregationOperationState *state_; + + StorageManager *storage_manager_; + + DISALLOW_COPY_AND_ASSIGN(BuildAggregationExistenceMapWorkOrder); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_AGGREGATION_EXISTENCE_MAP_OPERATOR_HPP_