Repository: incubator-quickstep Updated Branches: refs/heads/dist-fix [created] 0d2a17943
Style fixes in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0d2a1794 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0d2a1794 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0d2a1794 Branch: refs/heads/dist-fix Commit: 0d2a17943097a25c9261d49ff1a0930f6aa86cbf Parents: 5f5073f Author: Zuyu Zhang <zu...@apache.org> Authored: Tue Feb 28 14:19:17 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Tue Feb 28 14:23:33 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Cli.cpp | 4 +- cli/distributed/Conductor.cpp | 4 +- .../tests/DistributedExecutionGeneratorTest.cpp | 10 ++--- .../DistributedExecutionGeneratorTestRunner.cpp | 31 ++++++++++++++ .../DistributedExecutionGeneratorTestRunner.hpp | 45 +------------------- 5 files changed, 41 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d2a1794/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 60b9c8d..6228898 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -48,8 +48,8 @@ typedef quickstep::LineReaderDumb LineReaderImpl; #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "storage/DataExchangerAsync.hpp" -#include "utility/StringUtil.hpp" #include "storage/StorageBlockInfo.hpp" +#include "utility/StringUtil.hpp" #include "tmb/address.h" #include "tmb/id_typedefs.h" @@ -76,7 +76,7 @@ using tmb::client_id; namespace quickstep { -namespace S = ::quickstep::serialization; +namespace S = serialization; void Cli::init() { cli_id_ = bus_.Connect(); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d2a1794/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index cf2eb4b..3c68bfb 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -63,13 +63,13 @@ using tmb::client_id; namespace quickstep { -namespace S = ::quickstep::serialization; +namespace S = serialization; void Conductor::init() { try { string catalog_path = FLAGS_storage_path + kCatalogFilename; - if (quickstep::FLAGS_initialize_db) { // Initialize the database + if (FLAGS_initialize_db) { // Initialize the database DefaultsConfigurator::InitializeDefaultDatabase(FLAGS_storage_path, catalog_path); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d2a1794/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp index 1e2120e..b18b5ec 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp @@ -39,6 +39,8 @@ DECLARE_bool(use_filter_joins); using quickstep::TextBasedTest; +using std::make_unique; + QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST); int main(int argc, char** argv) { @@ -59,11 +61,9 @@ int main(int argc, char** argv) { std::ifstream input_file(argv[1]); CHECK(input_file.is_open()) << argv[1]; - std::unique_ptr<quickstep::optimizer::DistributedExecutionGeneratorTestRunner> - test_runner( - new quickstep::optimizer::DistributedExecutionGeneratorTestRunner(argv[3])); - test_driver.reset( - new quickstep::TextBasedTestDriver(&input_file, test_runner.get())); + + auto test_runner = make_unique<quickstep::optimizer::DistributedExecutionGeneratorTestRunner>(argv[3]); + test_driver = make_unique<quickstep::TextBasedTestDriver>(&input_file, test_runner.get()); test_driver->registerOption( quickstep::optimizer::DistributedExecutionGeneratorTestRunner::kResetOption); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d2a1794/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 6bd7a1f..878ae3a 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -67,6 +67,8 @@ namespace optimizer { namespace { +constexpr int kNumInstances = 3; + void nop() {} } // namespace @@ -147,6 +149,35 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner } } +DistributedExecutionGeneratorTestRunner::~DistributedExecutionGeneratorTestRunner() { + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, foreman_->getBusClientID(), TaggedMessage(kPoisonMessage)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); + + for (int i = 0; i < kNumInstances; ++i) { + workers_[i]->join(); + shiftbosses_[i]->join(); + } + + foreman_->join(); + + test_database_loader_data_exchanger_.shutdown(); + test_database_loader_.reset(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].shutdown(); + storage_managers_[i].reset(); + } + + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(&bus_, cli_id_, locator_client_id_, TaggedMessage(kPoisonMessage))); + + test_database_loader_data_exchanger_.join(); + for (int i = 0; i < kNumInstances; ++i) { + data_exchangers_[i].join(); + } + block_locator_->join(); +} + void DistributedExecutionGeneratorTestRunner::runTestCase( const string &input, const std::set<string> &options, string *output) { // TODO(qzeng): Test multi-threaded query execution when we have a Sort operator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0d2a1794/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index 2cd2427..2c0381b 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -31,7 +31,6 @@ #include "query_execution/BlockLocator.hpp" #include "query_execution/ForemanDistributed.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" #include "query_execution/Shiftboss.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" @@ -45,16 +44,10 @@ #include "glog/logging.h" #include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" namespace quickstep { namespace optimizer { -namespace { -constexpr int kNumInstances = 3; -} // namespace - /** * @brief TextBasedTestRunner for testing the ExecutionGenerator in the * distributed version. @@ -72,43 +65,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { */ explicit DistributedExecutionGeneratorTestRunner(const std::string &storage_path); - ~DistributedExecutionGeneratorTestRunner() { - tmb::TaggedMessage poison_tagged_message(quickstep::kPoisonMessage); - - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage( - &bus_, - cli_id_, - foreman_->getBusClientID(), - std::move(poison_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - - for (int i = 0; i < kNumInstances; ++i) { - workers_[i]->join(); - shiftbosses_[i]->join(); - } - - foreman_->join(); - - test_database_loader_data_exchanger_.shutdown(); - test_database_loader_.reset(); - for (int i = 0; i < kNumInstances; ++i) { - data_exchangers_[i].shutdown(); - storage_managers_[i].reset(); - } - - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, - cli_id_, - locator_client_id_, - tmb::TaggedMessage(quickstep::kPoisonMessage))); - - test_database_loader_data_exchanger_.join(); - for (int i = 0; i < kNumInstances; ++i) { - data_exchangers_[i].join(); - } - block_locator_->join(); - } + ~DistributedExecutionGeneratorTestRunner(); void runTestCase(const std::string &input, const std::set<std::string> &options,