Repository: incubator-quickstep Updated Branches: refs/heads/master 5e0c32acd -> 87bbb2629
Added command support in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/87bbb262 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/87bbb262 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/87bbb262 Branch: refs/heads/master Commit: 87bbb2629ddc8f09c997612fca2cd700ce95d040 Parents: 5e0c32a Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Feb 27 00:30:43 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Mar 1 14:41:27 2017 -0800 ---------------------------------------------------------------------- CMakeLists.txt | 2 + cli/distributed/CMakeLists.txt | 3 + cli/distributed/Cli.cpp | 72 ++++-- cli/distributed/Conductor.cpp | 81 +++++- cli/distributed/Conductor.hpp | 3 + cli/tests/CMakeLists.txt | 41 ++++ cli/tests/DistributedCommandExecutorTest.cpp | 62 +++++ .../DistributedCommandExecutorTestRunner.cpp | 246 +++++++++++++++++++ .../DistributedCommandExecutorTestRunner.hpp | 99 ++++++++ cli/tests/command_executor/CMakeLists.txt | 18 ++ query_execution/QueryExecutionMessages.proto | 8 + query_execution/QueryExecutionTypedefs.hpp | 10 +- 12 files changed, 624 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 918069c..9cd02be 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -817,6 +817,7 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_distributed_cli_shell glog quickstep_catalog_CatalogRelation + quickstep_cli_Constants quickstep_cli_Flags quickstep_cli_LineReader quickstep_cli_PrintToScreen @@ -833,6 +834,7 @@ if (ENABLE_DISTRIBUTED) quickstep_storage_StorageBlockInfo quickstep_storage_StorageManager quickstep_utility_Macros + quickstep_utility_SqlError quickstep_utility_StringUtil tmb ${GFLAGS_LIB_NAME} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/distributed/CMakeLists.txt b/cli/distributed/CMakeLists.txt index 5804321..1f7dee0 100644 --- a/cli/distributed/CMakeLists.txt +++ b/cli/distributed/CMakeLists.txt @@ -26,6 +26,8 @@ add_library(quickstep_cli_distributed_Role Role.cpp Role.hpp) target_link_libraries(quickstep_cli_distributed_Conductor glog quickstep_catalog_CatalogDatabase + quickstep_cli_CommandExecutorUtil + quickstep_cli_Constants quickstep_cli_DefaultsConfigurator quickstep_cli_Flags quickstep_cli_distributed_Role @@ -41,6 +43,7 @@ target_link_libraries(quickstep_cli_distributed_Conductor quickstep_storage_StorageConstants quickstep_utility_Macros quickstep_utility_SqlError + quickstep_utility_StringUtil tmb) target_link_libraries(quickstep_cli_distributed_Executor glog http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 6228898..49b7dc1 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -30,6 +30,7 @@ #include "catalog/CatalogRelation.hpp" #include "cli/CliConfig.h" // For QUICKSTEP_USE_LINENOISE. +#include "cli/Constants.hpp" #include "cli/Flags.hpp" #ifdef QUICKSTEP_USE_LINENOISE @@ -49,6 +50,7 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "query_execution/QueryExecutionUtil.hpp" #include "storage/DataExchangerAsync.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/SqlError.hpp" #include "utility/StringUtil.hpp" #include "tmb/address.h" @@ -76,6 +78,7 @@ using tmb::client_id; namespace quickstep { +namespace C = cli; namespace S = serialization; void Cli::init() { @@ -127,6 +130,10 @@ void Cli::init() { bus_.RegisterClientAsSender(cli_id_, kQueryResultTeardownMessage); bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionErrorMessage); + + // Prepare for submitting a command. + bus_.RegisterClientAsSender(cli_id_, kCommandMessage); + bus_.RegisterClientAsReceiver(cli_id_, kCommandResponseMessage); } void Cli::run() { @@ -158,27 +165,51 @@ void Cli::run() { break; } - CHECK_NE(statement.getStatementType(), ParseStatement::kCommand) - << "TODO(quickstep-team)"; + if (statement.getStatementType() == ParseStatement::kCommand) { + const ParseCommand &command = static_cast<const ParseCommand &>(statement); + const std::string &command_str = command.command()->value(); + try { + if (command_str == C::kAnalyzeCommand) { + // TODO(zuyu): support '\analyze'. + THROW_SQL_ERROR_AT(command.command()) << "Unsupported Command"; + } else if (command_str != C::kDescribeDatabaseCommand && + command_str != C::kDescribeTableCommand) { + THROW_SQL_ERROR_AT(command.command()) << "Invalid Command"; + } + } catch (const SqlError &error) { + fprintf(stderr, "%s", error.formatMessage(*command_string).c_str()); + reset_parser = true; + break; + } + + DLOG(INFO) << "DistributedCli sent CommandMessage (typed '" << kCommandMessage + << "') to Conductor"; + S::CommandMessage proto; + proto.set_command(*command_string); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage - << "') to Conductor"; - S::SqlQueryMessage proto; - proto.set_sql_query(*command_string); + TaggedMessage command_message(static_cast<const void*>(proto_bytes), proto_length, kCommandMessage); + free(proto_bytes); - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(command_message)); + } else { + DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage + << "') to Conductor"; + S::SqlQueryMessage proto; + proto.set_sql_query(*command_string); - TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), - proto_length, - kSqlQueryMessage); - free(proto_bytes); + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - QueryExecutionUtil::SendTMBMessage(&bus_, - cli_id_, - conductor_client_id_, - move(sql_query_message)); + TaggedMessage sql_query_message(static_cast<const void*>(proto_bytes), proto_length, kSqlQueryMessage); + free(proto_bytes); + + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, conductor_client_id_, move(sql_query_message)); + } start = std::chrono::steady_clock::now(); @@ -187,6 +218,13 @@ void Cli::run() { DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type() << "' message from client " << annotated_message.sender; switch (tagged_message.message_type()) { + case kCommandResponseMessage: { + S::CommandResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + printf("%s", proto.command_response().c_str()); + break; + } case kQueryExecutionSuccessMessage: { end = std::chrono::steady_clock::now(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index 3c68bfb..b877b04 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -29,6 +29,8 @@ #include <utility> #include "catalog/CatalogDatabase.hpp" +#include "cli/CommandExecutorUtil.hpp" +#include "cli/Constants.hpp" #include "cli/DefaultsConfigurator.hpp" #include "cli/Flags.hpp" #include "parser/ParseStatement.hpp" @@ -42,6 +44,7 @@ #include "query_optimizer/QueryProcessor.hpp" #include "storage/StorageConstants.hpp" #include "utility/SqlError.hpp" +#include "utility/StringUtil.hpp" #include "tmb/id_typedefs.h" #include "tmb/native_net_client_message_bus.h" @@ -63,6 +66,7 @@ using tmb::client_id; namespace quickstep { +namespace C = cli; namespace S = serialization; void Conductor::init() { @@ -91,6 +95,9 @@ void Conductor::init() { bus_.RegisterClientAsReceiver(conductor_client_id_, kDistributedCliRegistrationMessage); bus_.RegisterClientAsSender(conductor_client_id_, kDistributedCliRegistrationResponseMessage); + bus_.RegisterClientAsReceiver(conductor_client_id_, kCommandMessage); + bus_.RegisterClientAsSender(conductor_client_id_, kCommandResponseMessage); + bus_.RegisterClientAsReceiver(conductor_client_id_, kSqlQueryMessage); bus_.RegisterClientAsSender(conductor_client_id_, kQueryExecutionErrorMessage); bus_.RegisterClientAsSender(conductor_client_id_, kAdmitRequestMessage); @@ -125,6 +132,14 @@ void Conductor::run() { QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); break; } + case kCommandMessage: { + S::CommandMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + DLOG(INFO) << "Conductor received the following command: " << proto.command(); + + processCommandMessage(sender, new string(move(proto.command()))); + break; + } case kSqlQueryMessage: { S::SqlQueryMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -146,6 +161,69 @@ void Conductor::run() { } } +void Conductor::processCommandMessage(const tmb::client_id sender, string *command_string) { + parser_wrapper_.feedNextBuffer(command_string); + ParseResult parse_result = parser_wrapper_.getNextStatement(); + + CHECK(parse_result.condition == ParseResult::kSuccess) + << "Any syntax error should be addressed in the DistributedCli."; + + const ParseStatement &statement = *parse_result.parsed_statement; + DCHECK_EQ(ParseStatement::kCommand, statement.getStatementType()); + + const ParseCommand &command = static_cast<const ParseCommand &>(statement); + const PtrVector<ParseString> &arguments = *(command.arguments()); + const string &command_str = command.command()->value(); + + string command_response; + + try { + if (command_str == C::kDescribeDatabaseCommand) { + command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); + } else if (command_str == C::kDescribeTableCommand) { + if (arguments.empty()) { + command_response = C::ExecuteDescribeDatabase(arguments, *catalog_database_); + } else { + command_response = C::ExecuteDescribeTable(arguments, *catalog_database_); + } + } + } catch (const SqlError &command_error) { + // Set the query execution status along with the error message. + S::QueryExecutionErrorMessage proto; + proto.set_error_message(command_error.formatMessage(*command_string)); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kQueryExecutionErrorMessage); + free(proto_bytes); + + DLOG(INFO) << "Conductor sent QueryExecutionErrorMessage (typed '" + << kQueryExecutionErrorMessage + << "') to Distributed CLI " << sender; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); + } + + S::CommandResponseMessage proto; + proto.set_command_response(command_response); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); + free(proto_bytes); + + DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage + << "') to Distributed CLI " << sender; + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); +} + void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *command_string) { parser_wrapper_.feedNextBuffer(command_string); ParseResult parse_result = parser_wrapper_.getNextStatement(); @@ -154,8 +232,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm << "Any SQL syntax error should be addressed in the DistributedCli."; const ParseStatement &statement = *parse_result.parsed_statement; - CHECK(statement.getStatementType() != ParseStatement::kCommand) - << "TODO(quickstep-team)"; + DCHECK_NE(ParseStatement::kCommand, statement.getStatementType()); try { auto query_handle = make_unique<QueryHandle>(query_processor_->query_id(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/distributed/Conductor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.hpp b/cli/distributed/Conductor.hpp index 09bf2b9..e7e003f 100644 --- a/cli/distributed/Conductor.hpp +++ b/cli/distributed/Conductor.hpp @@ -35,6 +35,7 @@ namespace quickstep { class CatalogDatabase; +class ParseCommand; /** \addtogroup CliDistributed * @{ @@ -60,6 +61,8 @@ class Conductor final : public Role { void run() override; private: + void processCommandMessage(const tmb::client_id sender, std::string *command_string); + void processSqlQueryMessage(const tmb::client_id sender, std::string *command_string); SqlParserWrapper parser_wrapper_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/tests/CMakeLists.txt b/cli/tests/CMakeLists.txt index 48f27bb..7f8150f 100644 --- a/cli/tests/CMakeLists.txt +++ b/cli/tests/CMakeLists.txt @@ -23,6 +23,14 @@ add_executable(quickstep_cli_tests_CommandExecutorTest CommandExecutorTestRunner.hpp "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp" "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp") +if (ENABLE_DISTRIBUTED) + add_executable(quickstep_cli_tests_DistributedCommandExecutorTest + DistributedCommandExecutorTest.cpp + DistributedCommandExecutorTestRunner.cpp + DistributedCommandExecutorTestRunner.hpp + "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.cpp" + "${PROJECT_SOURCE_DIR}/utility/textbased_test/TextBasedTest.hpp") +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_cli_tests_CommandExecutorTest glog @@ -49,3 +57,36 @@ target_link_libraries(quickstep_cli_tests_CommandExecutorTest quickstep_utility_TextBasedTestDriver tmb ${LIBS}) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_cli_tests_DistributedCommandExecutorTest + glog + gtest + quickstep_catalog_CatalogTypedefs + quickstep_cli_CommandExecutorUtil + quickstep_cli_Constants + quickstep_cli_DropRelation + quickstep_cli_PrintToScreen + quickstep_parser_ParseStatement + quickstep_parser_SqlParserWrapper + quickstep_queryexecution_BlockLocator + quickstep_queryexecution_BlockLocatorUtil + quickstep_queryexecution_ForemanDistributed + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_Shiftboss + quickstep_queryexecution_Worker + quickstep_queryexecution_WorkerDirectory + quickstep_queryoptimizer_Optimizer + quickstep_queryoptimizer_OptimizerContext + quickstep_queryoptimizer_QueryHandle + quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_storage_DataExchangerAsync + quickstep_storage_StorageManager + quickstep_utility_Macros + quickstep_utility_MemStream + quickstep_utility_SqlError + quickstep_utility_TextBasedTestDriver + tmb + ${GFLAGS_LIB_NAME} + ${LIBS}) +endif(ENABLE_DISTRIBUTED) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTest.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/DistributedCommandExecutorTest.cpp b/cli/tests/DistributedCommandExecutorTest.cpp new file mode 100644 index 0000000..b41a70f --- /dev/null +++ b/cli/tests/DistributedCommandExecutorTest.cpp @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include <iostream> +#include <fstream> +#include <memory> + +#include "cli/tests/DistributedCommandExecutorTestRunner.hpp" +#include "utility/textbased_test/TextBasedTestDriver.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" +#include "gtest/gtest.h" + +using quickstep::TextBasedTest; + +using std::make_unique; + +QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_COMMAND_EXECUTOR_TEST); + +int main(int argc, char** argv) { + google::InitGoogleLogging(argv[0]); + // Honor FLAGS_buffer_pool_slots in StorageManager. + gflags::ParseCommandLineFlags(&argc, &argv, true); + + if (argc < 4) { + LOG(ERROR) << "Must have at least 3 arguments, but " << argc - 1 + << " are provided"; + } + + std::ifstream input_file(argv[1]); + CHECK(input_file.is_open()) << argv[1]; + + auto test_runner = make_unique<quickstep::DistributedCommandExecutorTestRunner>(argv[3]); + test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get()); + test_driver->registerOption( + quickstep::DistributedCommandExecutorTestRunner::kResetOption); + + ::testing::InitGoogleTest(&argc, argv); + const int success = RUN_ALL_TESTS(); + if (success != 0) { + test_driver->writeActualOutputToFile(argv[2]); + } + + return success; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp new file mode 100644 index 0000000..66d0767 --- /dev/null +++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "cli/tests/DistributedCommandExecutorTestRunner.hpp" + +#include <cstdio> +#include <functional> +#include <memory> +#include <set> +#include <string> +#include <utility> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "cli/CommandExecutorUtil.hpp" +#include "cli/Constants.hpp" +#include "cli/DropRelation.hpp" +#include "cli/PrintToScreen.hpp" +#include "parser/ParseStatement.hpp" +#include "query_execution/BlockLocator.hpp" +#include "query_execution/BlockLocatorUtil.hpp" +#include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_optimizer/Optimizer.hpp" +#include "query_optimizer/OptimizerContext.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "query_optimizer/tests/TestDatabaseLoader.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageManager.hpp" +#include "utility/MemStream.hpp" +#include "utility/SqlError.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::make_unique; +using std::string; +using std::vector; + +using tmb::TaggedMessage; + +namespace quickstep { + +class CatalogRelation; + +namespace { + +void nop() {} + +} // namespace + +namespace C = cli; + +const char *DistributedCommandExecutorTestRunner::kResetOption = + "reset_before_execution"; + +DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const string &storage_path) + : query_id_(0) { + bus_.Initialize(); + + cli_id_ = bus_.Connect(); + bus_.RegisterClientAsSender(cli_id_, kAdmitRequestMessage); + bus_.RegisterClientAsSender(cli_id_, kPoisonMessage); + bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); + + bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage); + bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage); + + block_locator_ = make_unique<BlockLocator>(&bus_); + block_locator_->start(); + + test_database_loader_ = make_unique<optimizer::TestDatabaseLoader>( + storage_path, + block_locator::getBlockDomain( + test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_), + locator_client_id_, + &bus_); + DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_); + test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager()); + test_database_loader_data_exchanger_.start(); + + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); + + // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former + // could receive a registration message from the latter. + foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_, + test_database_loader_->catalog_database()); + + // We don't use the NUMA aware version of worker code. + const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */, + kAnyNUMANodeID); + + bus_local_.Initialize(); + + worker_ = make_unique<Worker>(0 /* worker_thread_index */, &bus_local_); + + const vector<tmb::client_id> worker_client_ids(1, worker_->getBusClientID()); + worker_directory_ = make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes); + + storage_manager_ = make_unique<StorageManager>( + storage_path, + block_locator::getBlockDomain( + data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_), + locator_client_id_, &bus_); + DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_); + + data_exchanger_.set_storage_manager(storage_manager_.get()); + shiftboss_ = + make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get(), + storage_manager_->hdfs()); + + foreman_->start(); + + data_exchanger_.start(); + shiftboss_->start(); + worker_->start(); +} + +DistributedCommandExecutorTestRunner::~DistributedCommandExecutorTestRunner() { + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + + worker_->join(); + shiftboss_->join(); + + foreman_->join(); + + test_database_loader_data_exchanger_.shutdown(); + test_database_loader_.reset(); + data_exchanger_.shutdown(); + storage_manager_.reset(); + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage))); + + test_database_loader_data_exchanger_.join(); + data_exchanger_.join(); + block_locator_->join(); +} + +void DistributedCommandExecutorTestRunner::runTestCase( + const string &input, const std::set<string> &options, string *output) { + // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator. + + VLOG(4) << "Test SQL(s): " << input; + + if (options.find(kResetOption) != options.end()) { + test_database_loader_->clear(); + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); + } + + MemStream output_stream; + sql_parser_.feedNextBuffer(new string(input)); + + while (true) { + ParseResult result = sql_parser_.getNextStatement(); + if (result.condition != ParseResult::kSuccess) { + if (result.condition == ParseResult::kError) { + *output = result.error_message; + } + break; + } + + const ParseStatement &parse_statement = *result.parsed_statement; + std::printf("%s\n", parse_statement.toString().c_str()); + + try { + if (parse_statement.getStatementType() == ParseStatement::kCommand) { + const ParseCommand &command = static_cast<const ParseCommand &>(parse_statement); + const PtrVector<ParseString> &arguments = *(command.arguments()); + const string &command_str = command.command()->value(); + + string command_response; + if (command_str == C::kDescribeDatabaseCommand) { + command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database()); + } else if (command_str == C::kDescribeTableCommand) { + if (arguments.empty()) { + command_response = C::ExecuteDescribeDatabase(arguments, *test_database_loader_->catalog_database()); + } else { + command_response = C::ExecuteDescribeTable(arguments, *test_database_loader_->catalog_database()); + } + } else { + THROW_SQL_ERROR_AT(command.command()) << "Unsupported command"; + } + + std::fprintf(output_stream.file(), "%s", command_response.c_str()); + } else { + optimizer::OptimizerContext optimizer_context; + auto query_handle = std::make_unique<QueryHandle>(query_id_++, cli_id_); + + optimizer_.generateQueryHandle(parse_statement, + test_database_loader_->catalog_database(), + &optimizer_context, + query_handle.get()); + const CatalogRelation *query_result_relation = query_handle->getQueryResultRelation(); + + QueryExecutionUtil::ConstructAndSendAdmitRequestMessage( + cli_id_, foreman_->getBusClientID(), query_handle.release(), &bus_); + + const tmb::AnnotatedMessage annotated_message = bus_.Receive(cli_id_, 0, true); + DCHECK_EQ(kQueryExecutionSuccessMessage, annotated_message.tagged_message.message_type()); + + if (query_result_relation) { + PrintToScreen::PrintRelation(*query_result_relation, + test_database_loader_->storage_manager(), + output_stream.file()); + DropRelation::Drop(*query_result_relation, + test_database_loader_->catalog_database(), + test_database_loader_->storage_manager()); + } + } + } catch (const SqlError &error) { + *output = error.formatMessage(input); + break; + } + } + + if (output->empty()) { + *output = output_stream.str(); + } +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/DistributedCommandExecutorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/cli/tests/DistributedCommandExecutorTestRunner.hpp b/cli/tests/DistributedCommandExecutorTestRunner.hpp new file mode 100644 index 0000000..0427a85 --- /dev/null +++ b/cli/tests/DistributedCommandExecutorTestRunner.hpp @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_ +#define QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_ + +#include <cstddef> +#include <memory> +#include <set> +#include <string> +#include <utility> + +#include "parser/SqlParserWrapper.hpp" +#include "query_execution/BlockLocator.hpp" +#include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/Shiftboss.hpp" +#include "query_execution/Worker.hpp" +#include "query_execution/WorkerDirectory.hpp" +#include "query_optimizer/Optimizer.hpp" +#include "query_optimizer/tests/TestDatabaseLoader.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageManager.hpp" +#include "utility/Macros.hpp" +#include "utility/textbased_test/TextBasedTestRunner.hpp" + +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" + +namespace quickstep { + +/** + * @brief TextBasedTestRunner for testing the CommandExecutor in the + * distributed version. + */ +class DistributedCommandExecutorTestRunner : public TextBasedTestRunner { + public: + /** + * @brief If this option is enabled, recreate the entire database and + * repopulate the data before every test. + */ + static const char *kResetOption; + + /** + * @brief Constructor. + */ + explicit DistributedCommandExecutorTestRunner(const std::string &storage_path); + + ~DistributedCommandExecutorTestRunner(); + + void runTestCase(const std::string &input, + const std::set<std::string> &options, + std::string *output) override; + + private: + std::size_t query_id_; + + SqlParserWrapper sql_parser_; + std::unique_ptr<optimizer::TestDatabaseLoader> test_database_loader_; + DataExchangerAsync test_database_loader_data_exchanger_; + optimizer::Optimizer optimizer_; + + MessageBusImpl bus_; + tmb::client_id cli_id_, locator_client_id_; + + std::unique_ptr<BlockLocator> block_locator_; + + std::unique_ptr<ForemanDistributed> foreman_; + + MessageBusImpl bus_local_; + std::unique_ptr<Worker> worker_; + std::unique_ptr<WorkerDirectory> worker_directory_; + DataExchangerAsync data_exchanger_; + std::unique_ptr<StorageManager> storage_manager_; + std::unique_ptr<Shiftboss> shiftboss_; + + DISALLOW_COPY_AND_ASSIGN(DistributedCommandExecutorTestRunner); +}; + +} // namespace quickstep + +#endif // QUICKSTEP_CLI_TESTS_DISTRIBUTED_COMMAND_EXECUTOR_TEST_RUNNER_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/cli/tests/command_executor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cli/tests/command_executor/CMakeLists.txt b/cli/tests/command_executor/CMakeLists.txt index 9cf1869..e62d954 100644 --- a/cli/tests/command_executor/CMakeLists.txt +++ b/cli/tests/command_executor/CMakeLists.txt @@ -26,7 +26,25 @@ add_test(quickstep_cli_tests_commandexecutor_dt "${CMAKE_CURRENT_BINARY_DIR}/Dt.test" "${CMAKE_CURRENT_BINARY_DIR}/Dt/") +if (ENABLE_DISTRIBUTED) + add_test(quickstep_cli_tests_commandexecutor_d_distributed + "../quickstep_cli_tests_DistributedCommandExecutorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/D.test" + "${CMAKE_CURRENT_BINARY_DIR}/DDistributed.test" + "${CMAKE_CURRENT_BINARY_DIR}/DDistributed/") + add_test(quickstep_cli_tests_commandexecutor_dt_distributed + "../quickstep_cli_tests_DistributedCommandExecutorTest" + "${CMAKE_CURRENT_SOURCE_DIR}/Dt.test" + "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed.test" + "${CMAKE_CURRENT_BINARY_DIR}/DtDistributed/") +endif(ENABLE_DISTRIBUTED) + # Create the folders where the unit tests will store their data blocks for the # duration of their test. file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/D) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Dt) + +if (ENABLE_DISTRIBUTED) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DDistributed) + file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DtDistributed) +endif(ENABLE_DISTRIBUTED) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index 68f286d..47246d8 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -81,6 +81,10 @@ message ShiftbossRegistrationResponseMessage { required uint64 shiftboss_index = 1; } +message CommandMessage { + required string command = 1; +} + message SqlQueryMessage { required string sql_query = 1; } @@ -134,6 +138,10 @@ message SaveQueryResultResponseMessage { required uint64 shiftboss_index = 4; } +message CommandResponseMessage { + required string command_response = 1; +} + message QueryExecutionSuccessMessage { optional CatalogRelationSchema result_relation = 1; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/87bbb262/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 994bd60..0fd0bdf 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -89,7 +89,11 @@ enum QueryExecutionMessageType : message_type_id { // Shiftboss to Worker. kDistributedCliRegistrationMessage, // From CLI to Conductor. kDistributedCliRegistrationResponseMessage, // From Conductor to CLI. - kSqlQueryMessage, // From CLI to Conductor. + + // From CLI to Conductor. + kCommandMessage, + kSqlQueryMessage, + kQueryInitiateMessage, // From Foreman to Shiftboss. kQueryInitiateResponseMessage, // From Shiftboss to Foreman. @@ -101,8 +105,10 @@ enum QueryExecutionMessageType : message_type_id { kSaveQueryResultMessage, // From Foreman to Shiftboss. kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. + kQueryExecutionSuccessMessage, // From Foreman to CLI. + // From Foreman / Conductor to CLI. - kQueryExecutionSuccessMessage, + kCommandResponseMessage, kQueryExecutionErrorMessage, kQueryResultTeardownMessage, // From CLI to Conductor.