Repository: incubator-quickstep Updated Branches: refs/heads/collision-free-agg 0dce4d2ee -> 8dbac18b0 (forced update)
Saved catalog in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/27a80558 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/27a80558 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/27a80558 Branch: refs/heads/collision-free-agg Commit: 27a8055872f82737c35f6f0914ce43bcbe272ce3 Parents: dda085c Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Feb 5 02:16:34 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Feb 5 02:16:34 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Conductor.cpp | 5 ++++- query_execution/ForemanDistributed.cpp | 3 +++ query_execution/ForemanDistributed.hpp | 4 ++++ query_execution/PolicyEnforcerDistributed.cpp | 2 ++ query_execution/PolicyEnforcerDistributed.hpp | 7 +++++++ .../tests/DistributedExecutionGeneratorTestRunner.cpp | 10 +++++++++- 6 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index c4a2721..13d4d57 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -22,6 +22,7 @@ #include <cstddef> #include <cstdlib> #include <exception> +#include <functional> #include <memory> #include <sstream> #include <string> @@ -95,7 +96,9 @@ void Conductor::init() { block_locator_ = make_unique<BlockLocator>(&bus_); block_locator_->start(); - foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, query_processor_->getDefaultDatabase()); + foreman_ = make_unique<ForemanDistributed>(*block_locator_, + std::bind(&QueryProcessor::saveCatalog, query_processor_.get()), &bus_, + query_processor_->getDefaultDatabase()); foreman_->start(); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index fe4c483..4d95f16 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -17,6 +17,7 @@ #include <cstddef> #include <cstdio> #include <cstdlib> +#include <functional> #include <memory> #include <unordered_map> #include <unordered_set> @@ -67,6 +68,7 @@ class QueryHandle; ForemanDistributed::ForemanDistributed( const BlockLocator &block_locator, + std::function<void()> &&save_catalog_callback, MessageBus *bus, CatalogDatabaseLite *catalog_database, const int cpu_id) @@ -106,6 +108,7 @@ ForemanDistributed::ForemanDistributed( policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>( foreman_client_id_, + move(save_catalog_callback), catalog_database_, &shiftboss_directory_, bus_); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index ed09fda..5f1a14b 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -17,6 +17,7 @@ #include <cstddef> #include <cstdio> +#include <functional> #include <memory> #include <unordered_map> #include <unordered_set> @@ -55,6 +56,8 @@ class ForemanDistributed final : public ForemanBase { * @param block_locator The block locator that manages block location info. * @param bus A pointer to the TMB. * @param catalog_database The catalog database where this query is executed. + * @param save_catalog_callback The callback used to save catalog upon the query + * completion. * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. * * @note If cpu_id is not specified, Foreman thread can be possibly moved @@ -62,6 +65,7 @@ class ForemanDistributed final : public ForemanBase { **/ ForemanDistributed( const BlockLocator &block_locator, + std::function<void()> &&save_catalog_callback, tmb::MessageBus *bus, CatalogDatabaseLite *catalog_database, const int cpu_id = -1); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index e9f57d3..38b8a34 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -227,6 +227,8 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage } if (query_result == nullptr) { + save_catalog_callback_(); + // Clean up query execution states, i.e., QueryContext, in Shiftbosses. serialization::QueryTeardownMessage proto; proto.set_query_id(query_id); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 248948a..f8476c8 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -16,6 +16,7 @@ #define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ #include <cstddef> +#include <functional> #include <memory> #include <unordered_map> #include <utility> @@ -57,15 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { * @brief Constructor. * * @param foreman_client_id The TMB client ID of the Foreman. + * @param save_catalog_callback The callback used to save catalog upon the query + * completion. * @param catalog_database The CatalogDatabase used. * @param bus The TMB. **/ PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, + std::function<void()> &&save_catalog_callback, CatalogDatabaseLite *catalog_database, ShiftbossDirectory *shiftboss_directory, tmb::MessageBus *bus) : PolicyEnforcerBase(catalog_database), foreman_client_id_(foreman_client_id), + save_catalog_callback_(std::move(save_catalog_callback)), shiftboss_directory_(shiftboss_directory), bus_(bus) {} @@ -154,6 +159,8 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { const tmb::client_id foreman_client_id_; + const std::function<void()> save_catalog_callback_; + ShiftbossDirectory *shiftboss_directory_; tmb::MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/27a80558/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 45d4fdf..2e18467 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -21,6 +21,7 @@ #include <cstdio> #include <cstdlib> +#include <functional> #include <memory> #include <set> #include <string> @@ -64,6 +65,12 @@ class CatalogRelation; namespace optimizer { +namespace { + +void nop() {} + +} // namespace + const char *DistributedExecutionGeneratorTestRunner::kResetOption = "reset_before_execution"; @@ -98,7 +105,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former // could receive a registration message from the latter. - foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database()); + foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_, + test_database_loader_->catalog_database()); // We don't use the NUMA aware version of worker code. const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,