Refactored '\analyze' in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6ec765cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6ec765cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6ec765cb Branch: refs/heads/dist-patch Commit: 6ec765cb037230210f1a358dda47b85dd1305fe1 Parents: d383591 Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Mar 13 01:25:29 2017 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Mar 13 02:23:32 2017 -0700 ---------------------------------------------------------------------- cli/distributed/Conductor.cpp | 75 ++++++++++++++++++++++++-------------- cli/distributed/Conductor.hpp | 3 -- 2 files changed, 48 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index ef253f1..8b0ba03 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -210,11 +210,9 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm query_processor_->generateQueryHandle(statement, query_handle.get()); DCHECK(query_handle->getQueryPlanMutable() != nullptr); - QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( - conductor_client_id_, - foreman_->getBusClientID(), - query_handle.release(), - &bus_); + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_)); } } catch (const SqlError &sql_error) { // Set the query execution status along with the error message. @@ -252,11 +250,16 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars } } + SqlParserWrapper parser_wrapper; + std::vector<QueryHandle*> query_handles; + // Analyze each relation in the database. for (const CatalogRelation &relation : relations) { const relation_id rel_id = relation.getID(); const string rel_name = EscapeQuotes(relation.getName(), '"'); + string *query_string = nullptr; + // Get the number of distinct values for each column. for (const CatalogAttribute &attribute : relation) { const string attr_name = EscapeQuotes(attribute.getName(), '"'); @@ -269,7 +272,7 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars // NOTE(jianqiao): Note that the relation name and the attribute names may // contain non-letter characters, e.g. CREATE TABLE "with space"("1" int). // So here we need to format the names with double quotes ("). - string *query_string = new string("SELECT COUNT(DISTINCT \""); + query_string = new string("SELECT COUNT(DISTINCT \""); query_string->append(attr_name); query_string->append("\")"); if (is_min_applicable) { @@ -286,37 +289,55 @@ void Conductor::executeAnalyze(const tmb::client_id sender, const PtrVector<Pars query_string->append(rel_name); query_string->append("\";"); - submitQuery(sender, query_string, - new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, relations.size(), - attribute.getID(), is_min_applicable, is_max_applicable)); + parser_wrapper.feedNextBuffer(query_string); + const ParseResult parse_result = parser_wrapper.getNextStatement(); + DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); + + const ParseStatement &statement = *parse_result.parsed_statement; + + // Generate the query plan. + auto query_handle = + make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), + new QueryHandle::AnalyzeQueryInfo(true /* is_analyze_attribute_query */, rel_id, + relations.size(), attribute.getID(), + is_min_applicable, is_max_applicable)); + query_processor_->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + + query_handles.push_back(query_handle.release()); } // Get the number of tuples for the relation. - string *query_string = new string("SELECT COUNT(*) FROM \""); + query_string = new string("SELECT COUNT(*) FROM \""); query_string->append(rel_name); query_string->append("\";"); - submitQuery(sender, query_string, - new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, relations.size())); - } -} + parser_wrapper.feedNextBuffer(query_string); + const ParseResult parse_result = parser_wrapper.getNextStatement(); + DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); -void Conductor::submitQuery(const tmb::client_id sender, string *query, QueryHandle::AnalyzeQueryInfo *query_info) { - SqlParserWrapper parser_wrapper; - parser_wrapper.feedNextBuffer(query); - ParseResult parse_result = parser_wrapper.getNextStatement(); - DCHECK_EQ(ParseResult::kSuccess, parse_result.condition); + const ParseStatement &statement = *parse_result.parsed_statement; - const ParseStatement &statement = *parse_result.parsed_statement; + // Generate the query plan. + auto query_handle = + make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), + new QueryHandle::AnalyzeQueryInfo(false /* is_analyze_attribute_query */, rel_id, + relations.size())); + query_processor_->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); - // Generate the query plan. - auto query_handle = - make_unique<QueryHandle>(query_processor_->query_id(), sender, statement.getPriority(), query_info); - query_processor_->generateQueryHandle(statement, query_handle.get()); - DCHECK(query_handle->getQueryPlanMutable() != nullptr); + query_handles.push_back(query_handle.release()); + } + + if (!query_handles.empty()) { + auto request_message = make_unique<AdmitRequestMessage>(query_handles); + const std::size_t size_of_request_msg = sizeof(*request_message); - QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( - conductor_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_); + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, foreman_->getBusClientID(), + TaggedMessage(request_message.release(), size_of_request_msg, + kAdmitRequestMessage))); + } } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6ec765cb/cli/distributed/Conductor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp index 28c8e02..12c5b52 100644 --- a/cli/distributed/Conductor.hpp +++ b/cli/distributed/Conductor.hpp @@ -26,7 +26,6 @@ #include "cli/distributed/Role.hpp" #include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" -#include "query_optimizer/QueryHandle.hpp" #include "query_optimizer/QueryProcessor.hpp" #include "utility/Macros.hpp" #include "utility/PtrVector.hpp" @@ -66,8 +65,6 @@ class Conductor final : public Role { void executeAnalyze(const tmb::client_id sender, const PtrVector<ParseString> &arguments); - void submitQuery(const tmb::client_id sender, std::string *query, QueryHandle::AnalyzeQueryInfo *query_info); - std::unique_ptr<QueryProcessor> query_processor_; // Not owned. CatalogDatabase *catalog_database_;