Refactored catalog saving in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ccb2852f Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ccb2852f Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ccb2852f Branch: refs/heads/reorder-partitioned-hash-join Commit: ccb2852f71da77d364d4bfcb276cb6318b751a8c Parents: 964a806 Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Mar 2 17:14:50 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Mar 2 17:14:50 2017 -0800 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 1 + query_execution/ForemanDistributed.cpp | 5 ++--- query_execution/ForemanDistributed.hpp | 6 +++--- query_execution/PolicyEnforcerDistributed.cpp | 5 ++++- query_execution/PolicyEnforcerDistributed.hpp | 13 ++++++------- .../tests/DistributedExecutionGeneratorTestRunner.cpp | 6 ++---- 6 files changed, 18 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 12d6be0..23b706f 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -165,6 +165,7 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_ShiftbossDirectory quickstep_queryoptimizer_QueryHandle + quickstep_queryoptimizer_QueryProcessor quickstep_storage_StorageBlockInfo quickstep_utility_ExecutionDAGVisualizer quickstep_utility_Macros http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 389d6ab..57f432f 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -17,7 +17,6 @@ #include <cstddef> #include <cstdio> #include <cstdlib> -#include <functional> #include <memory> #include <unordered_map> #include <unordered_set> @@ -68,9 +67,9 @@ class QueryHandle; ForemanDistributed::ForemanDistributed( const BlockLocator &block_locator, - std::function<void()> &&save_catalog_callback, MessageBus *bus, CatalogDatabaseLite *catalog_database, + QueryProcessor *query_processor, const int cpu_id) : ForemanBase(bus, cpu_id), block_locator_(block_locator), @@ -108,8 +107,8 @@ ForemanDistributed::ForemanDistributed( policy_enforcer_ = std::make_unique<PolicyEnforcerDistributed>( foreman_client_id_, - move(save_catalog_callback), catalog_database_, + query_processor, &shiftboss_directory_, bus_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp index 5f1a14b..7fc98bd 100644 --- a/query_execution/ForemanDistributed.hpp +++ b/query_execution/ForemanDistributed.hpp @@ -17,7 +17,6 @@ #include <cstddef> #include <cstdio> -#include <functional> #include <memory> #include <unordered_map> #include <unordered_set> @@ -36,6 +35,7 @@ namespace quickstep { class BlockLocator; class CatalogDatabaseLite; +class QueryProcessor; namespace serialization { class WorkOrderMessage; } @@ -56,7 +56,7 @@ class ForemanDistributed final : public ForemanBase { * @param block_locator The block locator that manages block location info. * @param bus A pointer to the TMB. * @param catalog_database The catalog database where this query is executed. - * @param save_catalog_callback The callback used to save catalog upon the query + * @param query_processor The QueryProcessor to save catalog upon the query * completion. * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. * @@ -65,9 +65,9 @@ class ForemanDistributed final : public ForemanBase { **/ ForemanDistributed( const BlockLocator &block_locator, - std::function<void()> &&save_catalog_callback, tmb::MessageBus *bus, CatalogDatabaseLite *catalog_database, + QueryProcessor *query_processor, const int cpu_id = -1); ~ForemanDistributed() override {} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 6ee58a8..25f2d72 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -35,6 +35,7 @@ #include "query_execution/QueryManagerBase.hpp" #include "query_execution/QueryManagerDistributed.hpp" #include "query_optimizer/QueryHandle.hpp" +#include "query_optimizer/QueryProcessor.hpp" #include "storage/StorageBlockInfo.hpp" #include "utility/ExecutionDAGVisualizer.hpp" @@ -259,7 +260,9 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage } if (query_result == nullptr) { - save_catalog_callback_(); + if (query_processor_) { + query_processor_->saveCatalog(); + } // Clean up query execution states, i.e., QueryContext, in Shiftbosses. serialization::QueryTeardownMessage proto; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index f8476c8..18fd9ae 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -16,7 +16,6 @@ #define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ #include <cstddef> -#include <functional> #include <memory> #include <unordered_map> #include <utility> @@ -43,6 +42,7 @@ class TaggedMessage; namespace quickstep { class CatalogDatabaseLite; +class QueryProcessor; /** \addtogroup QueryExecution * @{ @@ -58,19 +58,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { * @brief Constructor. * * @param foreman_client_id The TMB client ID of the Foreman. - * @param save_catalog_callback The callback used to save catalog upon the query - * completion. * @param catalog_database The CatalogDatabase used. + * @param query_processor The QueryProcessor to save catalog upon the query + * completion. * @param bus The TMB. **/ PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, - std::function<void()> &&save_catalog_callback, CatalogDatabaseLite *catalog_database, + QueryProcessor *query_processor, ShiftbossDirectory *shiftboss_directory, tmb::MessageBus *bus) : PolicyEnforcerBase(catalog_database), foreman_client_id_(foreman_client_id), - save_catalog_callback_(std::move(save_catalog_callback)), + query_processor_(query_processor), shiftboss_directory_(shiftboss_directory), bus_(bus) {} @@ -159,8 +159,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { const tmb::client_id foreman_client_id_; - const std::function<void()> save_catalog_callback_; - + QueryProcessor *query_processor_; ShiftbossDirectory *shiftboss_directory_; tmb::MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ccb2852f/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 3b1259a..0eeb83f 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -71,8 +71,6 @@ namespace { constexpr int kNumInstances = 3; -void nop() {} - } // namespace const char *DistributedExecutionGeneratorTestRunner::kResetOption = @@ -110,8 +108,8 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former // could receive a registration message from the latter. - foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_, - test_database_loader_->catalog_database()); + foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(), + nullptr /* query_processor */); // We don't use the NUMA aware version of worker code. const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */,