Used multiple Shiftbosses in the distributed unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c4f4b285 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c4f4b285 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c4f4b285 Branch: refs/heads/min-max-stats Commit: c4f4b285b63e8a3d6bb8fd312f6d00cc8f1cbd45 Parents: 4286d75 Author: Zuyu Zhang <zu...@apache.org> Authored: Sat Nov 12 22:31:42 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 27 11:46:23 2016 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 14 +++--- query_execution/PolicyEnforcerDistributed.cpp | 6 ++- query_execution/QueryExecutionState.hpp | 53 ++++++++++++++++++++++ query_execution/QueryManagerDistributed.cpp | 35 ++++++++------ query_execution/QueryManagerDistributed.hpp | 4 +- 5 files changed, 88 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index d619657..7dccce4 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -238,17 +238,19 @@ void ForemanDistributed::run() { bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, kCatalogRelationNewBlockMessage, - kWorkOrderFeedbackMessage) && - // TODO(zuyu): Multiple Shiftbosses support. - !shiftboss_directory_.hasReachedCapacity(0); + kWorkOrderFeedbackMessage); } void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { + const size_t num_shiftbosses = shiftboss_directory_.size(); + size_t shiftboss_index = 0u; for (const auto &message : messages) { DCHECK(message != nullptr); - // TODO(zuyu): Multiple Shiftbosses support. - sendWorkOrderMessage(0, *message); - shiftboss_directory_.incrementNumQueuedWorkOrders(0); + sendWorkOrderMessage(shiftboss_index, *message); + shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index); + + // TO(zuyu): Take data-locality into account for scheduling. + shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index c06fd86..6e09ea8 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -140,8 +140,10 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); - query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders); - shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); + const size_t shiftboss_index = proto.shiftboss_index(); + query_manager->processInitiateRebuildResponseMessage( + proto.operator_index(), num_rebuild_work_orders, shiftboss_index); + shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders); if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { onQueryCompletion(query_manager); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryExecutionState.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp index f5281d5..a4273dc 100644 --- a/query_execution/QueryExecutionState.hpp +++ b/query_execution/QueryExecutionState.hpp @@ -22,6 +22,12 @@ #include <cstddef> #include <unordered_map> + +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. +#ifdef QUICKSTEP_DISTRIBUTED +#include <unordered_set> +#endif // QUICKSTEP_DISTRIBUTED + #include <utility> #include <vector> @@ -103,6 +109,49 @@ class QueryExecutionState { } } +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Update the rebuild status of the given operator the number of + * pending rebuild work orders, after the rebuild has been initiated. + * + * @param operator_index The index of the given operator. + * @param num_rebuild_workorders The number of rebuild workorders of the given + * operator. + * @param shiftboss_index The index of the Shiftboss that rebuilt. + **/ + void updateRebuildStatus(const std::size_t operator_index, + const std::size_t num_rebuild_workorders, + const std::size_t shiftboss_index) { + DCHECK_LT(operator_index, num_operators_); + auto search_res = rebuild_status_.find(operator_index); + DCHECK(search_res != rebuild_status_.end() && search_res->second.has_initiated); + search_res->second.num_pending_workorders += num_rebuild_workorders; + search_res->second.rebuilt_shiftboss_indexes.insert(shiftboss_index); + } + + /** + * @brief Check if the rebuild has been finished for the given operator. + * + * @param operator_index The index of the given operator. + * @param num_shiftbosses The number of the Shiftbosses for rebuilt. + * + * @return True if the rebuild has been finished, false otherwise. + **/ + inline bool hasRebuildFinished(const std::size_t operator_index, + const std::size_t num_shiftbosses) const { + DCHECK_LT(operator_index, num_operators_); + const auto search_res = rebuild_status_.find(operator_index); + DCHECK(search_res != rebuild_status_.end()); + + const auto &rebuild_status = search_res->second; + DCHECK(rebuild_status.has_initiated); + + return rebuild_status.rebuilt_shiftboss_indexes.size() == num_shiftbosses && + rebuild_status.num_pending_workorders == 0u; + } + +#endif // QUICKSTEP_DISTRIBUTED + /** * @brief Check if the rebuild has been initiated for the given operator. * @@ -314,6 +363,10 @@ class QueryExecutionState { // The number of pending rebuild workorders for the operator. // Valid if and only if 'has_initiated' is true. std::size_t num_pending_workorders; + +#ifdef QUICKSTEP_DISTRIBUTED + std::unordered_set<std::size_t> rebuilt_shiftboss_indexes; +#endif // QUICKSTEP_DISTRIBUTED }; // Key is dag_node_index for which rebuild is required. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index 7d45933..20650d0 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -36,7 +36,9 @@ #include "glog/logging.h" +#include "tmb/address.h" #include "tmb/id_typedefs.h" +#include "tmb/tagged_message.h" using std::free; using std::malloc; @@ -125,16 +127,16 @@ bool QueryManagerDistributed::fetchNormalWorkOrders(const dag_node_index index) } void QueryManagerDistributed::processInitiateRebuildResponseMessage(const dag_node_index op_index, - const std::size_t num_rebuild_work_orders) { - // TODO(zuyu): Multiple Shiftbosses support. - query_exec_state_->setRebuildStatus(op_index, num_rebuild_work_orders, true); + const std::size_t num_rebuild_work_orders, + const std::size_t shiftboss_index) { + query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index); - if (num_rebuild_work_orders != 0u) { + if (!query_exec_state_->hasRebuildFinished(op_index, shiftboss_directory_->size())) { // Wait for the rebuild work orders to finish. return; } - // No needs for rebuilds. + // No needs for rebuilds, or the rebuild has finished. markOperatorFinished(op_index); for (const std::pair<dag_node_index, bool> &dependent_link : @@ -168,17 +170,20 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { kInitiateRebuildMessage); free(proto_bytes); + // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); + } + LOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage - << "') to Shiftboss"; - // TODO(zuyu): Multiple workers support. - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - shiftboss_directory_->getClientId(0), - move(tagged_msg)); - - // The negative value indicates that the number of rebuild work orders is to be - // determined. - query_exec_state_->setRebuildStatus(index, -1, true); + << "') to all Shiftbosses"; + QueryExecutionUtil::BroadcastMessage(foreman_client_id_, + shiftboss_addresses, + move(tagged_msg), + bus_); + + query_exec_state_->setRebuildStatus(index, 0, true); // Wait for Shiftbosses to report the number of rebuild work orders. return false; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c4f4b285/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index e609ab8..f8ac53c 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -73,9 +73,11 @@ class QueryManagerDistributed final : public QueryManagerBase { * for initiating the rebuild work order. * @param num_rebuild_work_orders The number of the rebuild work orders * generated for the operator indexed by 'op_index'. + * @param shiftboss_index The index of the Shiftboss that sends the message. **/ void processInitiateRebuildResponseMessage(const dag_node_index op_index, - const std::size_t num_rebuild_work_orders); + const std::size_t num_rebuild_work_orders, + const std::size_t shiftboss_index); /** * @brief Get the next normal workorder to be excuted, wrapped in a