Use BlockLocator and DataExchangerAsync in the distributed tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/787a3251 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/787a3251 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/787a3251 Branch: refs/heads/exact-filter Commit: 787a3251019162610ebe13efbd341b3f9ac7a268 Parents: 3093e74 Author: Zuyu Zhang <zu...@apache.org> Authored: Fri Nov 4 23:12:09 2016 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Nov 14 20:38:04 2016 -0800 ---------------------------------------------------------------------- query_optimizer/tests/CMakeLists.txt | 8 ++ .../DistributedExecutionGeneratorTestRunner.cpp | 104 ++++++++++++++++--- .../DistributedExecutionGeneratorTestRunner.hpp | 32 +++++- query_optimizer/tests/TestDatabaseLoader.hpp | 59 +++++++++-- storage/DataExchangerAsync.cpp | 4 +- 5 files changed, 178 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index ac4548a..9c764e4 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -79,6 +79,10 @@ target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader quickstep_types_containers_Tuple quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_storage_StorageBlockInfo) +endif(ENABLE_DISTRIBUTED) if (ENABLE_DISTRIBUTED) add_executable(quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest @@ -110,7 +114,9 @@ if (ENABLE_DISTRIBUTED) quickstep_cli_PrintToScreen quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper + quickstep_queryexecution_BlockLocator quickstep_queryexecution_ForemanDistributed + quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Shiftboss @@ -120,6 +126,8 @@ if (ENABLE_DISTRIBUTED) quickstep_queryoptimizer_OptimizerContext quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_tests_TestDatabaseLoader + quickstep_storage_DataExchangerAsync + quickstep_storage_StorageBlockInfo quickstep_utility_Macros quickstep_utility_MemStream quickstep_utility_SqlError http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 5cccc21..0403e77 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -20,18 +20,26 @@ #include "query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp" #include <cstdio> +#include <cstdlib> +#include <memory> #include <set> #include <string> +#include <utility> #include <vector> #include "catalog/CatalogTypedefs.hpp" #include "cli/DropRelation.hpp" #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" +#include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" #include "query_optimizer/OptimizerContext.hpp" #include "query_optimizer/QueryHandle.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageBlockInfo.hpp" #include "utility/MemStream.hpp" #include "utility/SqlError.hpp" @@ -41,10 +49,15 @@ #include "tmb/message_bus.h" #include "tmb/tagged_message.h" -using std::string; +using std::free; using std::make_unique; +using std::malloc; +using std::move; +using std::string; using std::vector; +using tmb::TaggedMessage; + namespace quickstep { class CatalogRelation; @@ -56,10 +69,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption = DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path) : query_id_(0), - test_database_loader_(storage_path) { - test_database_loader_.createTestRelation(false /* allow_vchar */); - test_database_loader_.loadTestRelation(); - + data_exchangers_(kNumInstances) { bus_.Initialize(); cli_id_ = bus_.Connect(); @@ -67,9 +77,27 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner bus_.RegisterClientAsSender(cli_id_, kPoisonMessage); bus_.RegisterClientAsReceiver(cli_id_, kQueryExecutionSuccessMessage); + bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage); + bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage); + + block_locator_ = make_unique<BlockLocator>(&bus_); + locator_client_id_ = block_locator_->getBusClientID(); + block_locator_->start(); + + test_database_loader_ = make_unique<TestDatabaseLoader>( + storage_path, + getBlockDomain(test_database_loader_data_exchanger_.network_address()), + locator_client_id_, + &bus_); + test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager()); + test_database_loader_data_exchanger_.start(); + + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); + // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former // could receive a registration message from the latter. - foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_.catalog_database()); + foreman_ = make_unique<ForemanDistributed>(&bus_, test_database_loader_->catalog_database()); // We don't use the NUMA aware version of worker code. const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */, @@ -78,17 +106,24 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner for (int i = 0; i < kNumInstances; ++i) { workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_)); - const vector<tmb::client_id> worker_client_ids(1, workers_[i]->getBusClientID()); + const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID()); worker_directories_.push_back( make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes)); + auto storage_manager = make_unique<StorageManager>( + storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_); + + data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( - make_unique<Shiftboss>(&bus_, test_database_loader_.storage_manager(), worker_directories_[i].get())); + make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get())); + + storage_managers_.push_back(move(storage_manager)); } foreman_->start(); for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].start(); shiftbosses_[i]->start(); workers_[i]->start(); } @@ -101,9 +136,9 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( VLOG(4) << "Test SQL(s): " << input; if (options.find(kResetOption) != options.end()) { - test_database_loader_.clear(); - test_database_loader_.createTestRelation(false /* allow_vchar */); - test_database_loader_.loadTestRelation(); + test_database_loader_->clear(); + test_database_loader_->createTestRelation(false /* allow_vchar */); + test_database_loader_->loadTestRelation(); } MemStream output_stream; @@ -125,7 +160,7 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( QueryHandle query_handle(query_id_++, cli_id_); optimizer_.generateQueryHandle(parse_statement, - test_database_loader_.catalog_database(), + test_database_loader_->catalog_database(), &optimizer_context, &query_handle); @@ -141,11 +176,11 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( const CatalogRelation *query_result_relation = query_handle.getQueryResultRelation(); if (query_result_relation) { PrintToScreen::PrintRelation(*query_result_relation, - test_database_loader_.storage_manager(), + test_database_loader_->storage_manager(), output_stream.file()); DropRelation::Drop(*query_result_relation, - test_database_loader_.catalog_database(), - test_database_loader_.storage_manager()); + test_database_loader_->catalog_database(), + test_database_loader_->storage_manager()); } } catch (const SqlError &error) { *output = error.formatMessage(input); @@ -158,5 +193,44 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( } } +block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain( + const string &network_address) { + serialization::BlockDomainRegistrationMessage proto; + proto.set_domain_network_address(network_address); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kBlockDomainRegistrationMessage); + free(proto_bytes); + + DLOG(INFO) << "Client (id '" << cli_id_ + << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage + << "') to BlockLocator (id '" << locator_client_id_ << "')"; + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + cli_id_, + locator_client_id_, + move(message))); + + const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(locator_client_id_, annotated_message.sender); + CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); + DLOG(INFO) << "Client (id '" << cli_id_ + << "') received BlockDomainRegistrationResponseMessage (typed '" + << kBlockDomainRegistrationResponseMessage + << "') from BlockLocator"; + + serialization::BlockDomainMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + return static_cast<block_id_domain>(response_proto.block_domain()); +} + } // namespace optimizer } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index ab10841..d2b13e4 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -28,6 +28,7 @@ #include <vector> #include "parser/SqlParserWrapper.hpp" +#include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" @@ -36,6 +37,8 @@ #include "query_execution/WorkerDirectory.hpp" #include "query_optimizer/Optimizer.hpp" #include "query_optimizer/tests/TestDatabaseLoader.hpp" +#include "storage/DataExchangerAsync.hpp" +#include "storage/StorageBlockInfo.hpp" #include "utility/Macros.hpp" #include "utility/textbased_test/TextBasedTestRunner.hpp" @@ -86,6 +89,25 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { } foreman_->join(); + + test_database_loader_data_exchanger_.shutdown(); + test_database_loader_.reset(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].shutdown(); + storage_managers_[i].reset(); + } + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, + cli_id_, + locator_client_id_, + tmb::TaggedMessage(quickstep::kPoisonMessage))); + + test_database_loader_data_exchanger_.join(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].join(); + } + block_locator_->join(); } void runTestCase(const std::string &input, @@ -93,20 +115,26 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { std::string *output) override; private: + block_id_domain getBlockDomain(const std::string &network_address); + std::size_t query_id_; SqlParserWrapper sql_parser_; - TestDatabaseLoader test_database_loader_; + std::unique_ptr<TestDatabaseLoader> test_database_loader_; + DataExchangerAsync test_database_loader_data_exchanger_; Optimizer optimizer_; MessageBusImpl bus_; + tmb::client_id cli_id_, locator_client_id_; - tmb::client_id cli_id_; + std::unique_ptr<BlockLocator> block_locator_; std::unique_ptr<ForemanDistributed> foreman_; std::vector<std::unique_ptr<Worker>> workers_; std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_; + std::vector<DataExchangerAsync> data_exchangers_; + std::vector<std::unique_ptr<StorageManager>> storage_managers_; std::vector<std::unique_ptr<Shiftboss>> shiftbosses_; DISALLOW_COPY_AND_ASSIGN(DistributedExecutionGeneratorTestRunner); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/query_optimizer/tests/TestDatabaseLoader.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/TestDatabaseLoader.hpp b/query_optimizer/tests/TestDatabaseLoader.hpp index d49719d..87c19c6 100644 --- a/query_optimizer/tests/TestDatabaseLoader.hpp +++ b/query_optimizer/tests/TestDatabaseLoader.hpp @@ -24,12 +24,21 @@ #include "catalog/CatalogDatabase.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" + +#ifdef QUICKSTEP_DISTRIBUTED +#include "storage/StorageBlockInfo.hpp" +#endif // QUICKSTEP_DISTRIBUTED + #include "storage/StorageManager.hpp" #include "threading/ThreadIDBasedMap.hpp" #include "utility/Macros.hpp" #include "tmb/id_typedefs.h" +#ifdef QUICKSTEP_DISTRIBUTED +namespace tmb { class MessageBus; } +#endif // QUICKSTEP_DISTRIBUTED + namespace quickstep { class CatalogRelation; @@ -60,18 +69,34 @@ class TestDatabaseLoader { 0 /* id */), storage_manager_(storage_path), test_relation_(nullptr) { - bus_.Initialize(); - - const tmb::client_id worker_thread_client_id = bus_.Connect(); - bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); - - // Refer to InsertDestination::sendBlockFilledMessage for the rationale - // behind using ClientIDMap. - thread_id_map_->addValue(worker_thread_client_id); + init(); + } - scheduler_client_id_ = bus_.Connect(); - bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage); +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Constructor for the distributed version. + * + * @param storage_path A filesystem directory where the blocks may be + * evicted to during the execution of a test query. + * Can be empty if the test query is not executed + * in the query engine. + * @param block_domain The block_domain for StorageManager. + * @param locator_client_id The client id of BlockLocator for StorageManager. + * @param bus_global The Bus for StorageManager. + */ + TestDatabaseLoader(const std::string &storage_path, + const block_id_domain block_domain, + const tmb::client_id locator_client_id, + tmb::MessageBus *bus_global) + : thread_id_map_(ClientIDMap::Instance()), + catalog_database_(nullptr /* parent */, + "TestDatabase" /* name */, + 0 /* id */), + storage_manager_(storage_path, block_domain, locator_client_id, bus_global), + test_relation_(nullptr) { + init(); } +#endif // QUICKSTEP_DISTRIBUTED ~TestDatabaseLoader() { clear(); @@ -139,6 +164,20 @@ class TestDatabaseLoader { void clear(); private: + void init() { + bus_.Initialize(); + + const tmb::client_id worker_thread_client_id = bus_.Connect(); + bus_.RegisterClientAsSender(worker_thread_client_id, kCatalogRelationNewBlockMessage); + + // Refer to InsertDestination::sendBlockFilledMessage for the rationale + // behind using ClientIDMap. + thread_id_map_->addValue(worker_thread_client_id); + + scheduler_client_id_ = bus_.Connect(); + bus_.RegisterClientAsReceiver(scheduler_client_id_, kCatalogRelationNewBlockMessage); + } + /** * @brief Simulate Foreman to add all new blocks to the relation. */ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/787a3251/storage/DataExchangerAsync.cpp ---------------------------------------------------------------------- diff --git a/storage/DataExchangerAsync.cpp b/storage/DataExchangerAsync.cpp index 59f5ebf..1d2f7db 100644 --- a/storage/DataExchangerAsync.cpp +++ b/storage/DataExchangerAsync.cpp @@ -155,11 +155,11 @@ void DataExchangerAsync::run() { if (ok) { call_context->Proceed(); } else { - LOG(WARNING) << "Not ok\n"; + LOG(WARNING) << "DataExchangerAsync " << server_address_ << " is not ok"; delete call_context; } } else { - LOG(INFO) << "Shutdown\n"; + LOG(INFO) << "DataExchangerAsync " << server_address_ << " shuts down"; return; } }