Transfered the ownership of QueryHandle to QueryManager.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c608e99e Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c608e99e Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c608e99e Branch: refs/heads/q-handle-owner Commit: c608e99eff883afacafa2f0fefce99a0513f9963 Parents: 57730e4 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Nov 20 12:04:24 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 27 18:00:34 2016 -0800 ---------------------------------------------------------------------- cli/CommandExecutor.cpp | 30 +++++++------ cli/QuickstepCli.cpp | 46 ++++++++++---------- cli/tests/CommandExecutorTestRunner.cpp | 31 +++++++------ query_execution/QueryManagerBase.hpp | 7 ++- .../tests/QueryManagerSingleNode_unittest.cpp | 8 ++-- .../DistributedExecutionGeneratorTestRunner.cpp | 34 ++++++++------- .../tests/ExecutionGeneratorTestRunner.cpp | 35 ++++++++------- 7 files changed, 99 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/CommandExecutor.cpp ---------------------------------------------------------------------- diff --git a/cli/CommandExecutor.cpp b/cli/CommandExecutor.cpp index 4ab32de..3c510e7 100644 --- a/cli/CommandExecutor.cpp +++ b/cli/CommandExecutor.cpp @@ -213,25 +213,27 @@ inline std::vector<TypedValue> executeQueryForSingleRow( DCHECK(result.condition == ParseResult::kSuccess); const ParseStatement &statement = *result.parsed_statement; + const CatalogRelation *query_result_relation = nullptr; - // Generate the query plan. - std::unique_ptr<QueryHandle> query_handle( - std::make_unique<QueryHandle>(query_processor->query_id(), - main_thread_client_id, - statement.getPriority())); - query_processor->generateQueryHandle(statement, query_handle.get()); - DCHECK(query_handle->getQueryPlanMutable() != nullptr); - - // Use foreman to execute the query plan. - QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( - main_thread_client_id, foreman_client_id, query_handle.get(), bus); + { + // Generate the query plan. + auto query_handle = + std::make_unique<QueryHandle>(query_processor->query_id(), + main_thread_client_id, + statement.getPriority()); + query_processor->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + query_result_relation = query_handle->getQueryResultRelation(); + DCHECK(query_result_relation != nullptr); + + // Use foreman to execute the query plan. + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id, foreman_client_id, query_handle.release(), bus); + } QueryExecutionUtil::ReceiveQueryCompletionMessage(main_thread_client_id, bus); // Retrieve the scalar result from the result relation. - const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); - DCHECK(query_result_relation != nullptr); - std::vector<TypedValue> values; { std::vector<block_id> blocks = query_result_relation->getBlocksSnapshot(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/QuickstepCli.cpp ---------------------------------------------------------------------- diff --git a/cli/QuickstepCli.cpp b/cli/QuickstepCli.cpp index f4816a8..9db7577 100644 --- a/cli/QuickstepCli.cpp +++ b/cli/QuickstepCli.cpp @@ -341,38 +341,41 @@ int main(int argc, char* argv[]) { continue; } - std::unique_ptr<QueryHandle> query_handle( - std::make_unique<QueryHandle>(query_processor->query_id(), - main_thread_client_id, - statement.getPriority())); + const std::size_t query_id = query_processor->query_id(); + const CatalogRelation *query_result_relation = nullptr; + std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer; + try { + auto query_handle = std::make_unique<QueryHandle>(query_id, + main_thread_client_id, + statement.getPriority()); query_processor->generateQueryHandle(statement, query_handle.get()); + DCHECK(query_handle->getQueryPlanMutable() != nullptr); + + if (quickstep::FLAGS_visualize_execution_dag) { + dag_visualizer = + std::make_unique<quickstep::ExecutionDAGVisualizer>(*query_handle->getQueryPlanMutable()); + } + + query_result_relation = query_handle->getQueryResultRelation(); + + start = std::chrono::steady_clock::now(); + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id, + foreman.getBusClientID(), + query_handle.release(), + &bus); } catch (const quickstep::SqlError &sql_error) { fprintf(stderr, "%s", sql_error.formatMessage(*command_string).c_str()); reset_parser = true; break; } - DCHECK(query_handle->getQueryPlanMutable() != nullptr); - std::unique_ptr<quickstep::ExecutionDAGVisualizer> dag_visualizer; - if (quickstep::FLAGS_visualize_execution_dag) { - dag_visualizer.reset( - new quickstep::ExecutionDAGVisualizer(*query_handle->getQueryPlanMutable())); - } - - start = std::chrono::steady_clock::now(); - QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( - main_thread_client_id, - foreman.getBusClientID(), - query_handle.get(), - &bus); - try { QueryExecutionUtil::ReceiveQueryCompletionMessage( main_thread_client_id, &bus); end = std::chrono::steady_clock::now(); - const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, &storage_manager, @@ -394,12 +397,11 @@ int main(int argc, char* argv[]) { time_ms.count(), 3).c_str()); if (quickstep::FLAGS_profile_and_report_workorder_perf) { // TODO(harshad) - Allow user specified file instead of stdout. - foreman.printWorkOrderProfilingResults(query_handle->query_id(), - stdout); + foreman.printWorkOrderProfilingResults(query_id, stdout); } if (quickstep::FLAGS_visualize_execution_dag) { const auto &profiling_stats = - foreman.getWorkOrderProfilingResults(query_handle->query_id()); + foreman.getWorkOrderProfilingResults(query_id); dag_visualizer->bindProfilingStats(profiling_stats); std::cerr << "\n" << dag_visualizer->toDOT() << "\n"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/cli/tests/CommandExecutorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/CommandExecutorTestRunner.cpp b/cli/tests/CommandExecutorTestRunner.cpp index 41cc9da..e352b24 100644 --- a/cli/tests/CommandExecutorTestRunner.cpp +++ b/cli/tests/CommandExecutorTestRunner.cpp @@ -20,6 +20,7 @@ #include "cli/tests/CommandExecutorTestRunner.hpp" #include <cstdio> +#include <memory> #include <set> #include <string> #include <utility> @@ -88,28 +89,26 @@ void CommandExecutorTestRunner::runTestCase( nullptr, output_stream.file()); } else { - QueryHandle query_handle(0 /* query_id */, main_thread_client_id_); - O::OptimizerContext optimizer_context; - - optimizer_.generateQueryHandle(parse_statement, - test_database_loader_.catalog_database(), - &optimizer_context, - &query_handle); - - AdmitRequestMessage request_message(&query_handle); - TaggedMessage admit_tagged_message( - &request_message, sizeof(request_message), kAdmitRequestMessage); - QueryExecutionUtil::SendTMBMessage(&bus_, - main_thread_client_id_, - foreman_->getBusClientID(), - std::move(admit_tagged_message)); + const CatalogRelation *query_result_relation = nullptr; + { + auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_); + O::OptimizerContext optimizer_context; + + optimizer_.generateQueryHandle(parse_statement, + test_database_loader_.catalog_database(), + &optimizer_context, + query_handle.get()); + query_result_relation = query_handle->getQueryResultRelation(); + + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + main_thread_client_id_, foreman_->getBusClientID(), query_handle.release(), &bus_); + } // Receive workload completion message from Foreman. const AnnotatedMessage annotated_msg = bus_.Receive(main_thread_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_msg.tagged_message; DCHECK_EQ(kWorkloadCompletionMessage, tagged_message.message_type()); - const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, test_database_loader_.storage_manager(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index a274742..ddb76d5 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -26,6 +26,7 @@ #include "catalog/CatalogTypedefs.hpp" #include "query_execution/QueryExecutionState.hpp" +#include "query_optimizer/QueryHandle.hpp" #include "relational_operators/RelationalOperator.hpp" #include "relational_operators/WorkOrder.hpp" #include "storage/StorageBlockInfo.hpp" @@ -34,8 +35,6 @@ namespace quickstep { -class QueryHandle; - /** \addtogroup QueryExecution * @{ */ @@ -77,7 +76,7 @@ class QueryManagerBase { * @brief Get the query handle. **/ const QueryHandle* query_handle() const { - return query_handle_; + return query_handle_.get(); } /** @@ -259,7 +258,7 @@ class QueryManagerBase { return query_exec_state_->hasRebuildInitiated(index); } - const QueryHandle *query_handle_; + std::unique_ptr<QueryHandle> query_handle_; const std::size_t query_id_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_execution/tests/QueryManagerSingleNode_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp index f65ec53..6ec6521 100644 --- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp +++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp @@ -234,14 +234,14 @@ class QueryManagerTest : public ::testing::Test { db_.reset(new CatalogDatabase(nullptr /* catalog */, "database")); storage_manager_.reset(new StorageManager("./")); bus_.Initialize(); - query_handle_.reset(new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */)); + query_handle_ = new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */); query_plan_ = query_handle_->getQueryPlanMutable(); query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id()); } inline void constructQueryManager() { query_manager_.reset(new QueryManagerSingleNode( - 0, 1, query_handle_.get(), db_.get(), storage_manager_.get(), &bus_)); + 0, 1, query_handle_, db_.get(), storage_manager_.get(), &bus_)); } inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const { @@ -291,8 +291,8 @@ class QueryManagerTest : public ::testing::Test { unique_ptr<CatalogDatabase> db_; unique_ptr<StorageManager> storage_manager_; - QueryPlan *query_plan_; - unique_ptr<QueryHandle> query_handle_; + QueryPlan *query_plan_; // Owned by 'query_handle_'. + QueryHandle* query_handle_; // Owned by 'query_manager_'. unique_ptr<QueryManagerSingleNode> query_manager_; MessageBusImpl bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 2351dcd..5100651 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -160,37 +160,39 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( const ParseStatement &parse_statement = *result.parsed_statement; std::printf("%s\n", parse_statement.toString().c_str()); + + const CatalogRelation *query_result_relation = nullptr; try { OptimizerContext optimizer_context; - QueryHandle query_handle(query_id_++, cli_id_); + auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_); optimizer_.generateQueryHandle(parse_statement, test_database_loader_->catalog_database(), &optimizer_context, - &query_handle); + query_handle.get()); + query_result_relation = query_handle->getQueryResultRelation(); QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( cli_id_, foreman_->getBusClientID(), - &query_handle, + query_handle.release(), &bus_); - - const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true); - DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type()); - - const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); - if (query_result_relation) { - PrintToScreen::PrintRelation(*query_result_relation, - test_database_loader_->storage_manager(), - output_stream.file()); - DropRelation::Drop(*query_result_relation, - test_database_loader_->catalog_database(), - test_database_loader_->storage_manager()); - } } catch (const SqlError &error) { *output = error.formatMessage(input); break; } + + const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true); + DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type()); + + if (query_result_relation) { + PrintToScreen::PrintRelation(*query_result_relation, + test_database_loader_->storage_manager(), + output_stream.file()); + DropRelation::Drop(*query_result_relation, + test_database_loader_->catalog_database(), + test_database_loader_->storage_manager()); + } } if (output->empty()) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c608e99e/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp index 06397d4..ee9bee7 100644 --- a/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.cpp @@ -20,6 +20,7 @@ #include "query_optimizer/tests/ExecutionGeneratorTestRunner.hpp" #include <cstdio> +#include <memory> #include <set> #include <string> @@ -71,37 +72,39 @@ void ExecutionGeneratorTestRunner::runTestCase( } else { const ParseStatement &parse_statement = *result.parsed_statement; std::printf("%s\n", parse_statement.toString().c_str()); + + const CatalogRelation *query_result_relation = nullptr; try { - QueryHandle query_handle(0 /* query_id */, main_thread_client_id_); OptimizerContext optimizer_context; + auto query_handle = std::make_unique<QueryHandle>(0 /* query_id */, main_thread_client_id_); optimizer_.generateQueryHandle(parse_statement, test_database_loader_.catalog_database(), &optimizer_context, - &query_handle); + query_handle.get()); + query_result_relation = query_handle->getQueryResultRelation(); QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( main_thread_client_id_, foreman_->getBusClientID(), - &query_handle, + query_handle.release(), &bus_); - - QueryExecutionUtil::ReceiveQueryCompletionMessage( - main_thread_client_id_, &bus_); - - const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); - if (query_result_relation) { - PrintToScreen::PrintRelation(*query_result_relation, - test_database_loader_.storage_manager(), - output_stream.file()); - DropRelation::Drop(*query_result_relation, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager()); - } } catch (const SqlError &error) { *output = error.formatMessage(input); break; } + + QueryExecutionUtil::ReceiveQueryCompletionMessage( + main_thread_client_id_, &bus_); + + if (query_result_relation) { + PrintToScreen::PrintRelation(*query_result_relation, + test_database_loader_.storage_manager(), + output_stream.file()); + DropRelation::Drop(*query_result_relation, + test_database_loader_.catalog_database(), + test_database_loader_.storage_manager()); + } } }