Repository: incubator-quickstep Updated Branches: refs/heads/master 7727e7735 -> 929e5f1dd
Simplified the SelectOperator w/ partitions. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/929e5f1d Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/929e5f1d Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/929e5f1d Branch: refs/heads/master Commit: 929e5f1dd178538725eac1644712828e9dc5843d Parents: 7727e77 Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Jan 30 00:35:12 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Feb 2 19:13:30 2017 -0800 ---------------------------------------------------------------------- query_optimizer/ExecutionGenerator.cpp | 30 +++++++--- relational_operators/SelectOperator.cpp | 84 +++++++++++----------------- relational_operators/SelectOperator.hpp | 67 ++++++++++------------ 3 files changed, 84 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/929e5f1d/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index ce1452e..b73de39 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -551,6 +551,13 @@ void ExecutionGenerator::convertSelection( const CatalogRelationInfo *input_relation_info = findRelationInfoOutputByPhysical(physical_selection->input()); DCHECK(input_relation_info != nullptr); + const CatalogRelation &input_relation = *input_relation_info->relation; + const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme(); + + const std::size_t num_partitions = + input_partition_scheme + ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions() + : 1u; // Use the "simple" form of the selection operator (a pure projection that // doesn't require any expression evaluation or intermediate copies) if @@ -559,19 +566,21 @@ void ExecutionGenerator::convertSelection( SelectOperator *op = convertSimpleProjection(project_expressions_group_index, &attributes) ? new SelectOperator(query_handle_->query_id(), - *input_relation_info->relation, + input_relation, *output_relation, insert_destination_index, execution_predicate_index, move(attributes), - input_relation_info->isStoredRelation()) + input_relation_info->isStoredRelation(), + num_partitions) : new SelectOperator(query_handle_->query_id(), - *input_relation_info->relation, + input_relation, *output_relation, insert_destination_index, execution_predicate_index, project_expressions_group_index, - input_relation_info->isStoredRelation()); + input_relation_info->isStoredRelation(), + num_partitions); const QueryPlan::DAGNodeIndex select_index = execution_plan_->addRelationalOperator(op); @@ -1310,7 +1319,13 @@ void ExecutionGenerator::convertInsertSelection( const CatalogRelationInfo *selection_relation_info = findRelationInfoOutputByPhysical(physical_plan->selection()); - const CatalogRelation *selection_relation = selection_relation_info->relation; + const CatalogRelation &selection_relation = *selection_relation_info->relation; + const PartitionScheme *selection_partition_scheme = selection_relation.getPartitionScheme(); + + const std::size_t num_partitions = + selection_partition_scheme + ? selection_partition_scheme->getPartitionSchemeHeader().getNumPartitions() + : 1u; // Prepare the attributes, which are output columns of the selection relation. std::vector<attribute_id> attributes; @@ -1331,12 +1346,13 @@ void ExecutionGenerator::convertInsertSelection( // physical plan by modifying class Physical. SelectOperator *insert_selection_op = new SelectOperator(query_handle_->query_id(), - *selection_relation, + selection_relation, destination_relation, insert_destination_index, QueryContext::kInvalidPredicateId, move(attributes), - selection_relation_info->isStoredRelation()); + selection_relation_info->isStoredRelation(), + num_partitions); const QueryPlan::DAGNodeIndex insert_selection_index = execution_plan_->addRelationalOperator(insert_selection_op); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/929e5f1d/relational_operators/SelectOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp index 5419cf8..b63f0be 100644 --- a/relational_operators/SelectOperator.cpp +++ b/relational_operators/SelectOperator.cpp @@ -66,64 +66,40 @@ bool SelectOperator::getAllWorkOrders( return true; } - if (input_relation_.hasPartitionScheme()) { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { - for (const block_id input_block_id : input_relation_block_ids_in_partition_[part_id]) { - numa_node_id numa_node = 0; + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (const block_id input_block_id : input_relation_block_ids_[part_id]) { + numa_node_id numa_node = 0; #ifdef QUICKSTEP_HAVE_LIBNUMA - if (input_relation_.hasNUMAPlacementScheme()) { - numa_node = placement_scheme_->getNUMANodeForBlock(input_block_id); - } -#endif // QUICKSTEP_HAVE_LIBNUMA - container->addNormalWorkOrder( - new SelectWorkOrder(query_id_, input_relation_, input_block_id, predicate, simple_projection_, - simple_selection_, selection, output_destination, storage_manager, - CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), - op_index_); + if (input_relation_.hasNUMAPlacementScheme()) { + numa_node = placement_scheme_->getNUMANodeForBlock(input_block_id); } - } - } else { - for (const block_id input_block_id : input_relation_block_ids_) { +#endif // QUICKSTEP_HAVE_LIBNUMA container->addNormalWorkOrder( new SelectWorkOrder(query_id_, input_relation_, input_block_id, predicate, simple_projection_, simple_selection_, selection, output_destination, storage_manager, - CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), op_index_); } } started_ = true; return true; } else { - if (input_relation_.hasPartitionScheme()) { - for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { - while (num_workorders_generated_in_partition_[part_id] < - input_relation_block_ids_in_partition_[part_id].size()) { - const block_id block_in_partition - = input_relation_block_ids_in_partition_[part_id][num_workorders_generated_in_partition_[part_id]]; - - numa_node_id numa_node = 0; + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) { + const block_id block = input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]; + + numa_node_id numa_node = 0; #ifdef QUICKSTEP_HAVE_LIBNUMA - if (input_relation_.hasNUMAPlacementScheme()) { - numa_node = placement_scheme_->getNUMANodeForBlock(block_in_partition); - } -#endif // QUICKSTEP_HAVE_LIBNUMA - container->addNormalWorkOrder( - new SelectWorkOrder(query_id_, input_relation_, block_in_partition, predicate, simple_projection_, - simple_selection_, selection, output_destination, storage_manager, - CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), - op_index_); - ++num_workorders_generated_in_partition_[part_id]; + if (input_relation_.hasNUMAPlacementScheme()) { + numa_node = placement_scheme_->getNUMANodeForBlock(block); } - } - } else { - while (num_workorders_generated_ < input_relation_block_ids_.size()) { +#endif // QUICKSTEP_HAVE_LIBNUMA container->addNormalWorkOrder( - new SelectWorkOrder(query_id_, input_relation_, input_relation_block_ids_[num_workorders_generated_], - predicate, simple_projection_, simple_selection_, selection, output_destination, - storage_manager, - CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)), + new SelectWorkOrder(query_id_, input_relation_, block, predicate, simple_projection_, + simple_selection_, selection, output_destination, storage_manager, + CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node), op_index_); - ++num_workorders_generated_; + ++num_workorders_generated_[part_id]; } } return done_feeding_input_relation_; @@ -132,19 +108,25 @@ bool SelectOperator::getAllWorkOrders( bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) { if (input_relation_is_stored_) { - if (!started_) { - for (const block_id input_block_id : input_relation_block_ids_) { + if (started_) { + return true; + } + + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + for (const block_id input_block_id : input_relation_block_ids_[part_id]) { container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_); } - started_ = true; } + started_ = true; return true; } else { - while (num_workorders_generated_ < input_relation_block_ids_.size()) { - container->addWorkOrderProto( - createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]), - op_index_); - ++num_workorders_generated_; + for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { + while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) { + container->addWorkOrderProto( + createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]), + op_index_); + ++num_workorders_generated_[part_id]; + } } return done_feeding_input_relation_; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/929e5f1d/relational_operators/SelectOperator.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp index 79ab37f..b9a4d49 100644 --- a/relational_operators/SelectOperator.hpp +++ b/relational_operators/SelectOperator.hpp @@ -85,6 +85,8 @@ class SelectOperator : public RelationalOperator { * @param input_relation_is_stored If input_relation is a stored relation and * is fully available to the operator before it can start generating * workorders. + * @param num_partitions The number of partitions in 'input_relation'. + * If no partitions, it is one. **/ SelectOperator( const std::size_t query_id, @@ -93,36 +95,33 @@ class SelectOperator : public RelationalOperator { const QueryContext::insert_destination_id output_destination_index, const QueryContext::predicate_id predicate_index, const QueryContext::scalar_group_id selection_index, - const bool input_relation_is_stored) + const bool input_relation_is_stored, + const std::size_t num_partitions) : RelationalOperator(query_id), input_relation_(input_relation), output_relation_(output_relation), output_destination_index_(output_destination_index), predicate_index_(predicate_index), selection_index_(selection_index), - num_workorders_generated_(0), + num_partitions_(num_partitions), + input_relation_block_ids_(num_partitions), + num_workorders_generated_(num_partitions), simple_projection_(false), input_relation_is_stored_(input_relation_is_stored), started_(false) { #ifdef QUICKSTEP_HAVE_LIBNUMA placement_scheme_ = input_relation.getNUMAPlacementSchemePtr(); #endif - if (input_relation.hasPartitionScheme()) { - const PartitionScheme &part_scheme = *input_relation.getPartitionScheme(); - num_partitions_ = part_scheme.getPartitionSchemeHeader().getNumPartitions(); + if (input_relation_is_stored) { + if (input_relation.hasPartitionScheme()) { + const PartitionScheme &part_scheme = *input_relation.getPartitionScheme(); - num_workorders_generated_in_partition_.resize(num_partitions_); - - if (input_relation_is_stored) { for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { - input_relation_block_ids_in_partition_.push_back( - part_scheme.getBlocksInPartition(part_id)); + input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); } } else { - input_relation_block_ids_in_partition_.resize(num_partitions_); + input_relation_block_ids_[0] = input_relation.getBlocksSnapshot(); } - } else if (input_relation_is_stored) { - input_relation_block_ids_ = input_relation.getBlocksSnapshot(); } } @@ -144,6 +143,8 @@ class SelectOperator : public RelationalOperator { * @param input_relation_is_stored If input_relation is a stored relation and * is fully available to the operator before it can start generating * workorders. + * @param num_partitions The number of partitions in 'input_relation'. + * If no partitions, it is one. **/ SelectOperator( const std::size_t query_id, @@ -152,7 +153,8 @@ class SelectOperator : public RelationalOperator { const QueryContext::insert_destination_id output_destination_index, const QueryContext::predicate_id predicate_index, std::vector<attribute_id> &&selection, - const bool input_relation_is_stored) + const bool input_relation_is_stored, + const std::size_t num_partitions) : RelationalOperator(query_id), input_relation_(input_relation), output_relation_(output_relation), @@ -160,29 +162,25 @@ class SelectOperator : public RelationalOperator { predicate_index_(predicate_index), selection_index_(QueryContext::kInvalidScalarGroupId), simple_selection_(std::move(selection)), - num_workorders_generated_(0), + num_partitions_(num_partitions), + input_relation_block_ids_(num_partitions), + num_workorders_generated_(num_partitions), simple_projection_(true), input_relation_is_stored_(input_relation_is_stored), started_(false) { #ifdef QUICKSTEP_HAVE_LIBNUMA placement_scheme_ = input_relation.getNUMAPlacementSchemePtr(); #endif - if (input_relation.hasPartitionScheme()) { - const PartitionScheme &part_scheme = *input_relation.getPartitionScheme(); - num_partitions_ = part_scheme.getPartitionSchemeHeader().getNumPartitions(); - - num_workorders_generated_in_partition_.resize(num_partitions_); + if (input_relation_is_stored) { + if (input_relation.hasPartitionScheme()) { + const PartitionScheme &part_scheme = *input_relation.getPartitionScheme(); - if (input_relation_is_stored) { for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) { - input_relation_block_ids_in_partition_.push_back( - part_scheme.getBlocksInPartition(part_id)); + input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id); } } else { - input_relation_block_ids_in_partition_.resize(num_partitions_); + input_relation_block_ids_[0] = input_relation.getBlocksSnapshot(); } - } else if (input_relation_is_stored) { - input_relation_block_ids_ = input_relation.getBlocksSnapshot(); } } @@ -206,11 +204,7 @@ class SelectOperator : public RelationalOperator { void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id, const partition_id part_id) override { - if (input_relation_.hasPartitionScheme()) { - input_relation_block_ids_in_partition_[part_id].push_back(input_block_id); - } else { - input_relation_block_ids_.push_back(input_block_id); - } + input_relation_block_ids_[part_id].push_back(input_block_id); } QueryContext::insert_destination_id getInsertDestinationID() const override { @@ -237,17 +231,12 @@ class SelectOperator : public RelationalOperator { const QueryContext::scalar_group_id selection_index_; const std::vector<attribute_id> simple_selection_; - std::vector<block_id> input_relation_block_ids_; - // A single workorder is generated for each block of input relation. - std::vector<block_id>::size_type num_workorders_generated_; - - // Used for the partition case only. + const std::size_t num_partitions_; // A vector of vectors V where V[i] indicates the list of block IDs of the // input relation that belong to the partition i. - std::vector<std::vector<block_id>> input_relation_block_ids_in_partition_; + std::vector<std::vector<block_id>> input_relation_block_ids_; // A single workorder is generated for each block in each partition of input relation. - std::vector<std::size_t> num_workorders_generated_in_partition_; - std::size_t num_partitions_; + std::vector<std::size_t> num_workorders_generated_; const bool simple_projection_; const bool input_relation_is_stored_;