Scheduled FinalizeAggr / DestroyAggr WorkOrder on the same Shiftboss of AggrWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/a98e4a14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/a98e4a14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/a98e4a14 Branch: refs/heads/multi-shiftboss-test Commit: a98e4a147781ea883daeb8c6223b60ce0f839a26 Parents: 4d935f4 Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Nov 17 12:01:26 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 27 10:50:04 2016 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 71 +++++++++++++++++----- query_execution/ForemanDistributed.hpp | 9 ++- query_execution/PolicyEnforcerDistributed.cpp | 12 ++++ query_execution/PolicyEnforcerDistributed.hpp | 18 ++++++ query_execution/QueryManagerDistributed.hpp | 23 +++++++ 5 files changed, 116 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a98e4a14/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 900a71f..7ca246d 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -243,28 +243,55 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag kWorkOrderFeedbackMessage); } -bool ForemanDistributed::isHashJoinRelatedWorkOrder(const unique_ptr<S::WorkOrderMessage> &message, +bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage &proto, + const size_t next_shiftboss_index_to_schedule, + size_t *shiftboss_index_for_aggregation) { + const S::WorkOrder &work_order_proto = proto.work_order(); + QueryContext::aggregation_state_id aggr_state_index; + + switch (work_order_proto.work_order_type()) { + case S::AGGREGATION: + aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index); + break; + case S::FINALIZE_AGGREGATION: + aggr_state_index = work_order_proto.GetExtension(S::FinalizeAggregationWorkOrder::aggr_state_index); + break; + case S::DESTROY_AGGREGATION_STATE: + aggr_state_index = work_order_proto.GetExtension(S::DestroyAggregationStateWorkOrder::aggr_state_index); + break; + default: + return false; + } + + policy_enforcer_->getShiftbossIndexForAggregation( + proto.query_id(), aggr_state_index, next_shiftboss_index_to_schedule, + shiftboss_index_for_aggregation); + + return true; +} + +bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto, const size_t next_shiftboss_index_to_schedule, size_t *shiftboss_index_for_hash_join) { - const S::WorkOrder &work_order_proto = message->work_order(); + const S::WorkOrder &work_order_proto = proto.work_order(); QueryContext::join_hash_table_id join_hash_table_index; switch (work_order_proto.work_order_type()) { case S::BUILD_HASH: - join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index); - break; - case S::DESTROY_HASH: - join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index); + join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index); break; case S::HASH_JOIN: - join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index); + join_hash_table_index = work_order_proto.GetExtension(S::HashJoinWorkOrder::join_hash_table_index); + break; + case S::DESTROY_HASH: + join_hash_table_index = work_order_proto.GetExtension(S::DestroyHashWorkOrder::join_hash_table_index); break; default: return false; } policy_enforcer_->getShiftbossIndexForHashJoin( - message->query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, + proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join); return true; @@ -275,14 +302,11 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo size_t shiftboss_index = 0u; for (const auto &message : messages) { DCHECK(message != nullptr); - size_t shiftboss_index_for_hash_join; - if (isHashJoinRelatedWorkOrder(message, shiftboss_index, &shiftboss_index_for_hash_join)) { - sendWorkOrderMessage(shiftboss_index_for_hash_join, *message); - shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join); - - if (shiftboss_index == shiftboss_index_for_hash_join) { - shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; - } + size_t shiftboss_index_for_particular_work_order_type; + if (isAggregationRelatedWorkOrder(*message, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { + dispatchWorkOrderMessagesHelper(*message, shiftboss_index_for_particular_work_order_type, &shiftboss_index); + } else if (isHashJoinRelatedWorkOrder(*message, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { + dispatchWorkOrderMessagesHelper(*message, shiftboss_index_for_particular_work_order_type, &shiftboss_index); } else { sendWorkOrderMessage(shiftboss_index, *message); shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index); @@ -293,6 +317,21 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo } } +void ForemanDistributed::dispatchWorkOrderMessagesHelper(const S::WorkOrderMessage &proto, + const size_t shiftboss_index_for_particular_work_order_type, + size_t *shiftboss_index) { + sendWorkOrderMessage(shiftboss_index_for_particular_work_order_type, proto); + shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type); + + if (*shiftboss_index == shiftboss_index_for_particular_work_order_type) { + *shiftboss_index = (*shiftboss_index + 1) % shiftboss_directory_.size(); + } else { + // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case, + // <shiftboss_index_for_particular_work_order_type> will be scheduled one + // more WorkOrder. + } +} + void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, const S::WorkOrderMessage &proto) { const size_t proto_length = proto.ByteSize(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a98e4a14/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index 55e747f..f6d8597 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -71,7 +71,11 @@ class ForemanDistributed final : public ForemanBase { void run() override; private: - bool isHashJoinRelatedWorkOrder(const std::unique_ptr<serialization::WorkOrderMessage> &message, + bool isAggregationRelatedWorkOrder(const serialization::WorkOrderMessage &proto, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index_for_aggregation); + + bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto, const std::size_t next_shiftboss_index_to_schedule, std::size_t *shiftboss_index_for_hash_join); @@ -84,6 +88,9 @@ class ForemanDistributed final : public ForemanBase { void dispatchWorkOrderMessages( const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages); + void dispatchWorkOrderMessagesHelper(const serialization::WorkOrderMessage &proto, + const std::size_t shiftboss_index_for_particular_work_order_type, + std::size_t *shiftboss_index); /** * @brief Send the given message to the specified worker. * http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a98e4a14/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 86b36c8..c5642bc 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: } } +void PolicyEnforcerDistributed::getShiftbossIndexForAggregation( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); + query_manager->getShiftbossIndexForAggregation(aggr_state_index, + next_shiftboss_index_to_schedule, + shiftboss_index); +} + void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin( const std::size_t query_id, const QueryContext::join_hash_table_id join_hash_table_index, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a98e4a14/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 37326bd..e8bc394 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -90,6 +90,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); /** + * @brief Get or set the index of Shiftboss for an Aggregation related + * WorkOrder. If it is the first Aggregation on <aggr_state_index>, + * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>. + * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that + * has executed the first Aggregation. + * + * @param query_id The query id. + * @param aggr_state_index The Hash Table for the Aggregation. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForAggregation( + const std::size_t query_id, + const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index); + + /** * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder. * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index> * will be set to <next_shiftboss_index_to_schedule>. Otherwise, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a98e4a14/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index 2b21303..6455bf7 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -95,6 +95,26 @@ class QueryManagerDistributed final : public QueryManagerBase { const dag_node_index start_operator_index); /** + * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If + * the Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. + * + * @param aggr_state_index The Hash Table for the Aggregation. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + const auto cit = shiftboss_indexes_for_aggrs_.find(aggr_state_index); + if (cit != shiftboss_indexes_for_aggrs_.end()) { + *shiftboss_index = cit->second; + } else { + shiftboss_indexes_for_aggrs_.emplace(aggr_state_index, next_shiftboss_index_to_schedule); + *shiftboss_index = next_shiftboss_index_to_schedule; + } + } + + /** * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. * @@ -136,6 +156,9 @@ class QueryManagerDistributed final : public QueryManagerBase { std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_; + // A map from a aggregation to its scheduled Shiftboss index. + std::unordered_map<QueryContext::aggregation_state_id, std::size_t> shiftboss_indexes_for_aggrs_; + // A map from a join hash table to its scheduled Shiftboss index. std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_;