Removed the temp query result relation in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/aa7f6fe4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/aa7f6fe4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/aa7f6fe4 Branch: refs/heads/aggregate-on-left-outer-join Commit: aa7f6fe4e07804524aca0f1574935ae3f73c985d Parents: dd8747f Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Feb 8 00:33:31 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Feb 8 00:35:53 2017 -0800 ---------------------------------------------------------------------- cli/distributed/CMakeLists.txt | 1 + cli/distributed/Cli.cpp | 20 +++++++++++++++++++- cli/distributed/Conductor.cpp | 13 ++++++++++++- cli/distributed/Conductor.hpp | 4 ++++ query_execution/QueryExecutionMessages.proto | 4 ++++ query_execution/QueryExecutionTypedefs.hpp | 2 ++ 6 files changed, 42 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt index b46082f..5804321 100644 --- a/cli/distributed/CMakeLists.txt +++ b/cli/distributed/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp) # Link dependencies: target_link_libraries(quickstep_cli_distributed_Conductor glog + quickstep_catalog_CatalogDatabase quickstep_cli_DefaultsConfigurator quickstep_cli_Flags quickstep_cli_distributed_Role http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 5af70e6..386654d 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -122,7 +122,10 @@ void Cli::init() { // Prepare for submitting a query. bus_.RegisterClientAsSender(cli_id_, kSqlQueryMessage); + bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); + bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage); + bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage); } @@ -191,7 +194,7 @@ void Cli::run() { CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); if (proto.has_result_relation()) { - CatalogRelation result_relation(proto.result_relation()); + const CatalogRelation result_relation(proto.result_relation()); PrintToScreen::PrintRelation(result_relation, storage_manager_.get(), stdout); @@ -199,6 +202,21 @@ void Cli::run() { for (const block_id block : blocks) { storage_manager_->deleteBlockOrBlobFile(block); } + + // Notify Conductor to remove the temp query result relation in the Catalog. + S::QueryResultTeardownMessage proto_response; + proto_response.set_relation_id(result_relation.getID()); + + const size_t proto_response_length = proto_response.ByteSize(); + char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); + CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); + + TaggedMessage response_message(static_cast<const void*>(proto_response_bytes), + proto_response_length, + kQueryResultTeardownMessage); + free(proto_response_bytes); + + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(response_message)); } std::chrono::duration<double, std::milli> time_in_ms = end - start; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index 13d4d57..cf2eb4b 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -28,6 +28,7 @@ #include <string> #include <utility> +#include "catalog/CatalogDatabase.hpp" #include "cli/DefaultsConfigurator.hpp" #include "cli/Flags.hpp" #include "parser/ParseStatement.hpp" @@ -73,6 +74,7 @@ void Conductor::init() { } query_processor_ = make_unique<QueryProcessor>(move(catalog_path)); + catalog_database_ = query_processor_->getDefaultDatabase(); } catch (const std::exception &e) { LOG(FATAL) << "FATAL ERROR DURING STARTUP: " << e.what() << "\nIf you intended to create a new database, " @@ -93,12 +95,14 @@ void Conductor::init() { bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage); bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage); + bus_.RegisterClientAsReceiver(conductor_client_id_, kQueryResultTeardownMessage); + block_locator_ = make_unique<BlockLocator>(&bus_); block_locator_->start(); foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_, - query_processor_->getDefaultDatabase()); + catalog_database_); foreman_->start(); } @@ -129,6 +133,13 @@ void Conductor::run() { processSqlQueryMessage(sender, new string(move(proto.sql_query()))); break; } + case kQueryResultTeardownMessage: { + S::QueryResultTeardownMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + catalog_database_->dropRelationById(proto.relation_id()); + break; + } default: LOG(FATAL) << "Unknown TMB message type"; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/cli/distributed/Conductor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp index e8c9582..09bf2b9 100644 --- a/cli/distributed/Conductor.hpp +++ b/cli/distributed/Conductor.hpp @@ -34,6 +34,8 @@ namespace quickstep { +class CatalogDatabase; + /** \addtogroup CliDistributed * @{ */ @@ -63,6 +65,8 @@ class Conductor final : public Role { SqlParserWrapper parser_wrapper_; std::unique_ptr<QueryProcessor> query_processor_; + // Not owned. + CatalogDatabase *catalog_database_; tmb::client_id conductor_client_id_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 115a9a3..68f286d 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -138,6 +138,10 @@ message QueryExecutionSuccessMessage { optional CatalogRelationSchema result_relation = 1; } +message QueryResultTeardownMessage { + required int32 relation_id = 1; +} + message QueryExecutionErrorMessage { required string error_message = 1; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/aa7f6fe4/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 9f78302..994bd60 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -105,6 +105,8 @@ enum QueryExecutionMessageType : message_type_id { kQueryExecutionSuccessMessage, kQueryExecutionErrorMessage, + kQueryResultTeardownMessage, // From CLI to Conductor. + // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. kBlockDomainRegistrationMessage, // From Worker to BlockLocator.