Scheduled WorkOrders w/ the same aggr_state_index on the same Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/57730e40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/57730e40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/57730e40 Branch: refs/heads/exact-filter Commit: 57730e406533fa02280568380c853ce1c5dc19ee Parents: e75c265 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Nov 27 17:04:30 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 27 17:28:30 2016 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 61 ++++++++++++++++------ query_execution/ForemanDistributed.hpp | 4 ++ query_execution/PolicyEnforcerDistributed.cpp | 12 +++++ query_execution/PolicyEnforcerDistributed.hpp | 18 +++++++ query_execution/QueryManagerDistributed.hpp | 23 ++++++++ 5 files changed, 102 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index fc9cd3c..61f0603 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -243,6 +243,32 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag kWorkOrderFeedbackMessage); } +bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto, + const size_t next_shiftboss_index_to_schedule, + size_t *shiftboss_index_for_aggregation) { + const S::WorkOrder &work_order_proto = proto.work_order(); + QueryContext::aggregation_state_id aggr_state_index; + + switch (work_order_proto.work_order_type()) { + case S::AGGREGATION: + aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index); + break; + case S::FINALIZE_AGGREGATION: + aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index); + break; + case S::DESTROY_AGGREGATION_STATE: + aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index); + break; + default: + return false; + } + + static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation( + proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation); + + return true; +} + bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto, const size_t next_shiftboss_index_to_schedule, size_t *shiftboss_index_for_hash_join) { @@ -251,13 +277,13 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p switch (work_order_proto.work_order_type()) { case S::BUILD_HASH: - join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index); - break; - case S::DESTROY_HASH: - join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index); + join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index); break; case S::HASH_JOIN: - join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index); + join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index); + break; + case S::DESTROY_HASH: + join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index); break; default: return false; @@ -275,20 +301,23 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo for (const auto &message : messages) { DCHECK(message != nullptr); const S::WorkOrderMessage &proto = *message; - size_t shiftboss_index_for_hash_join; - if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) { - sendWorkOrderMessage(shiftboss_index_for_hash_join, proto); - shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join); - - if (shiftboss_index == shiftboss_index_for_hash_join) { - shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; - } + size_t shiftboss_index_for_particular_work_order_type; + if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { + } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { } else { - sendWorkOrderMessage(shiftboss_index, proto); - shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index); - // TODO(zuyu): Take data-locality into account for scheduling. + shiftboss_index_for_particular_work_order_type = shiftboss_index; + } + + sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto); + shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type); + + if (shiftboss_index == shiftboss_index_for_particular_work_order_type) { shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + } else { + // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case, + // <shiftboss_index_for_particular_work_order_type> might be scheduled one + // more WorkOrder for an Aggregation or a HashJoin. } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index 0616f30..34bac07 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase { void run() override; private: + bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index_for_aggregation); + bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index_for_hash_join); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 86b36c8..c5642bc 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: } } +void PolicyEnforcerDistributed::getShiftbossIndexForAggregation( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); + query_manager->getShiftbossIndexForAggregation(aggr_state_index, + next_shiftboss_index_to_schedule, + shiftboss_index); +} + void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin( const std::size_t query_id, const QueryContext::join_hash_table_id join_hash_table_index, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 37326bd..e8bc394 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -90,6 +90,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); /** + * @brief Get or set the index of Shiftboss for an Aggregation related + * WorkOrder. If it is the first Aggregation on <aggr_state_index>, + * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>. + * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that + * has executed the first Aggregation. + * + * @param query_id The query id. + * @param aggr_state_index The Hash Table for the Aggregation. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForAggregation( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index); + + /** * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder. * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index> * will be set to <next_shiftboss_index_to_schedule>. Otherwise, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/57730e40/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index 2b21303..7a07fcb 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -95,6 +95,26 @@ class QueryManagerDistributed final : public QueryManagerBase { const dag_node_index start_operator_index); /** + * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If + * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. + * + * @param aggr_state_index The Hash Table for the Aggregation. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index); + if (cit != shiftboss_indexes_for_aggrs_.end()) { + *shiftboss_index = cit->second; + } else { + shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule); + *shiftboss_index = next_shiftboss_index_to_schedule; + } + } + + /** * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. * @@ -136,6 +156,9 @@ class QueryManagerDistributed final : public QueryManagerBase { std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_; + // A map from an aggregation id to its scheduled Shiftboss index. + std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_; + // A map from a join hash table to its scheduled Shiftboss index. std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;