Minor refactored distributed query execution.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3011ddf6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3011ddf6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3011ddf6 Branch: refs/heads/tmb_poll_interval Commit: 3011ddf61ec92efcb833ef0a1168255ff97fb9f9 Parents: 5773027 Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Feb 8 17:36:45 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Feb 8 17:42:42 2017 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 1 - query_execution/PolicyEnforcerBase.cpp | 2 - query_execution/PolicyEnforcerBase.hpp | 14 ----- query_execution/PolicyEnforcerDistributed.cpp | 59 ++++++++++------------ 4 files changed, 27 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 4d95f16..8c20e65 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -175,7 +175,6 @@ void ForemanDistributed::run() { case kQueryInitiateResponseMessage: { S::QueryInitiateResponseMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - CHECK(policy_enforcer_->existQuery(proto.query_id())); break; } case kCatalogRelationNewBlockMessage: // Fall through http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index a26b84e..082f6e9 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -156,8 +156,6 @@ void PolicyEnforcerBase::removeQuery(const std::size_t query_id) { << " that hasn't finished its execution"; } admitted_queries_.erase(query_id); - - removed_query_ids_.insert(query_id); } bool PolicyEnforcerBase::admitQueries( http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index baf9c68..4107817 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -103,16 +103,6 @@ class PolicyEnforcerBase { void processMessage(const TaggedMessage &tagged_message); /** - * @brief Check if the given query id ever exists. - * - * @return True if the query ever exists, otherwise false. - **/ - inline bool existQuery(const std::size_t query_id) const { - return admitted_queries_.find(query_id) != admitted_queries_.end() || - removed_query_ids_.find(query_id) != removed_query_ids_.end(); - } - - /** * @brief Check if there are any queries to be executed. * * @return True if there is at least one active or waiting query, false if @@ -179,10 +169,6 @@ class PolicyEnforcerBase { // Key = query ID, value = QueryManagerBase* for the key query. std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_; - // TODO(quickstep-team): Delete a 'query_id' after receiving all - // 'QueryInitiateResponseMessage's for the 'query_id'. - std::unordered_set<std::size_t> removed_query_ids_; - // The queries which haven't been admitted yet. std::queue<QueryHandle*> waiting_queries_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3011ddf6/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 49a1d9a..ef5abb0 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -68,8 +68,15 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages( // TODO(harshad) - Make this function generic enough so that it // works well when multiple queries are getting executed. if (admitted_queries_.empty()) { - LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; - return; + if (waiting_queries_.empty()) { + LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; + return; + } else { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } } const std::size_t per_query_share = @@ -106,28 +113,28 @@ void PolicyEnforcerDistributed::getWorkOrderProtoMessages( } bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { - if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { - // Ok to admit the query. - const std::size_t query_id = query_handle->query_id(); - if (admitted_queries_.find(query_id) == admitted_queries_.end()) { - // NOTE(zuyu): Should call before constructing a 'QueryManager'. - // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' - // initializes. - initiateQueryInShiftboss(query_handle); - - // Query with the same ID not present, ok to admit. - admitted_queries_[query_id].reset( - new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); - return true; - } else { - LOG(ERROR) << "Query with the same ID " << query_id << " exists"; - return false; - } - } else { + if (admitted_queries_.size() >= PolicyEnforcerBase::kMaxConcurrentQueries) { // This query will have to wait. waiting_queries_.push(query_handle); return false; } + + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) != admitted_queries_.end()) { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + + // Ok to admit the query. + // NOTE(zuyu): Should call before constructing a 'QueryManager'. + // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' + // initializes. + initiateQueryInShiftboss(query_handle); + + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); + return true; } void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { @@ -144,18 +151,6 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: query_manager->processInitiateRebuildResponseMessage( proto.operator_index(), num_rebuild_work_orders, shiftboss_index); shiftboss_directory_->addNumQueuedWorkOrders(shiftboss_index, num_rebuild_work_orders); - - if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { - onQueryCompletion(query_manager); - - removeQuery(query_id); - if (!waiting_queries_.empty()) { - // Admit the earliest waiting query. - QueryHandle *new_query = waiting_queries_.front(); - waiting_queries_.pop(); - admitQuery(new_query); - } - } } void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(