Scheduled HashJoinWorkOrder on the same Shiftboss of BuildHashWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e75c265e Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e75c265e Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e75c265e Branch: refs/heads/multi-shiftboss-test Commit: e75c265eb279256d89f4fa634c542d566ce3cd7a Parents: c4f4b28 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Nov 13 20:20:56 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 27 16:45:22 2016 -0800 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 3 ++ query_execution/ForemanDistributed.cpp | 47 ++++++++++++++++++++-- query_execution/ForemanDistributed.hpp | 4 ++ query_execution/PolicyEnforcerDistributed.cpp | 12 ++++++ query_execution/PolicyEnforcerDistributed.hpp | 19 +++++++++ query_execution/QueryManagerDistributed.hpp | 25 ++++++++++++ 6 files changed, 106 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 719d9f3..1f7add8 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -108,10 +108,12 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_ForemanBase quickstep_queryexecution_PolicyEnforcerBase quickstep_queryexecution_PolicyEnforcerDistributed + quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_ShiftbossDirectory + quickstep_relationaloperators_WorkOrder_proto quickstep_threading_ThreadUtil quickstep_utility_EqualsAnyConstant quickstep_utility_Macros @@ -153,6 +155,7 @@ if (ENABLE_DISTRIBUTED) quickstep_catalog_CatalogRelation quickstep_catalog_Catalog_proto quickstep_queryexecution_PolicyEnforcerBase + quickstep_queryexecution_QueryContext quickstep_queryexecution_QueryContext_proto quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionState http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 7dccce4..fc9cd3c 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -30,10 +30,12 @@ #include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/PolicyEnforcerBase.hpp" #include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/ShiftbossDirectory.hpp" +#include "relational_operators/WorkOrder.pb.h" #include "threading/ThreadUtil.hpp" #include "utility/EqualsAnyConstant.hpp" @@ -241,16 +243,53 @@ bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id messag kWorkOrderFeedbackMessage); } +bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &proto, + const size_t next_shiftboss_index_to_schedule, + size_t *shiftboss_index_for_hash_join) { + const S::WorkOrder &work_order_proto = proto.work_order(); + QueryContext::join_hash_table_id join_hash_table_index; + + switch (work_order_proto.work_order_type()) { + case S::BUILD_HASH: + join_hash_table_index = work_order_proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index); + break; + case S::DESTROY_HASH: + join_hash_table_index = work_order_proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index); + break; + case S::HASH_JOIN: + join_hash_table_index = work_order_proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index); + break; + default: + return false; + } + + static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin( + proto.query_id(), join_hash_table_index, next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join); + + return true; +} + void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { const size_t num_shiftbosses = shiftboss_directory_.size(); size_t shiftboss_index = 0u; for (const auto &message : messages) { DCHECK(message != nullptr); - sendWorkOrderMessage(shiftboss_index, *message); - shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index); + const S::WorkOrderMessage &proto = *message; + size_t shiftboss_index_for_hash_join; + if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_hash_join)) { + sendWorkOrderMessage(shiftboss_index_for_hash_join, proto); + shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_hash_join); + + if (shiftboss_index == shiftboss_index_for_hash_join) { + shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + } + } else { + sendWorkOrderMessage(shiftboss_index, proto); + shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index); - // TO(zuyu): Take data-locality into account for scheduling. - shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + // TODO(zuyu): Take data-locality into account for scheduling. + shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + } } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index ccdd0ae..0616f30 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -71,6 +71,10 @@ class ForemanDistributed final : public ForemanBase { void run() override; private: + bool isHashJoinRelatedWorkOrder(const serialization::WorkOrderMessage &proto, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index_for_hash_join); + /** * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the * worker threads. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 6e09ea8..86b36c8 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -158,6 +158,18 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: } } +void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin( + const std::size_t query_id, + const QueryContext::join_hash_table_id join_hash_table_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); + query_manager->getShiftbossIndexForHashJoin(join_hash_table_index, + next_shiftboss_index_to_schedule, + shiftboss_index); +} + void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) { S::QueryInitiateMessage proto; proto.set_query_id(query_handle->query_id()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 146e9af..37326bd 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -20,6 +20,7 @@ #include <vector> #include "query_execution/PolicyEnforcerBase.hpp" +#include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/ShiftbossDirectory.hpp" #include "utility/Macros.hpp" @@ -88,6 +89,24 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { **/ void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); + /** + * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder. + * If it is the first BuildHash on <join_hash_table_index>, <shiftboss_index> + * will be set to <next_shiftboss_index_to_schedule>. Otherwise, + * <shiftboss_index> will be set to the index of the Shiftboss that has + * executed the first BuildHash. + * + * @param query_id The query id. + * @param join_hash_table_index The Hash Table for the Join. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForHashJoin( + const std::size_t query_id, + const QueryContext::join_hash_table_id join_hash_table_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index); + private: void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override { shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e75c265e/query_execution/QueryManagerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp index f8ac53c..2b21303 100644 --- a/query_execution/QueryManagerDistributed.hpp +++ b/query_execution/QueryManagerDistributed.hpp @@ -22,7 +22,9 @@ #include <cstddef> #include <memory> +#include <unordered_map> +#include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionState.hpp" #include "query_execution/QueryManagerBase.hpp" #include "query_execution/WorkOrderProtosContainer.hpp" @@ -92,6 +94,26 @@ class QueryManagerDistributed final : public QueryManagerBase { serialization::WorkOrderMessage* getNextWorkOrderMessage( const dag_node_index start_operator_index); + /** + * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the + * Shiftboss index is not found, set using <next_shiftboss_index_to_schedule>. + * + * @param join_hash_table_index The Hash Table for the Join. + * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder. + * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder. + **/ + void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index, + const std::size_t next_shiftboss_index_to_schedule, + std::size_t *shiftboss_index) { + const auto cit = shiftboss_indexes_for_hash_joins_.find(join_hash_table_index); + if (cit != shiftboss_indexes_for_hash_joins_.end()) { + *shiftboss_index = cit->second; + } else { + shiftboss_indexes_for_hash_joins_.emplace(join_hash_table_index, next_shiftboss_index_to_schedule); + *shiftboss_index = next_shiftboss_index_to_schedule; + } + } + private: bool checkNormalExecutionOver(const dag_node_index index) const override { return (checkAllDependenciesMet(index) && @@ -114,6 +136,9 @@ class QueryManagerDistributed final : public QueryManagerBase { std::unique_ptr<WorkOrderProtosContainer> normal_workorder_protos_container_; + // A map from a join hash table to its scheduled Shiftboss index. + std::unordered_map<QueryContext::join_hash_table_id, std::size_t> shiftboss_indexes_for_hash_joins_; + DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed); };