Repository: incubator-quickstep Updated Branches: refs/heads/mark_single_node_query [created] 5e03cdd17
Marked SingleNodeQuery for Insertions. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5e03cdd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5e03cdd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5e03cdd1 Branch: refs/heads/mark_single_node_query Commit: 5e03cdd17ca30679890065587c0c0c2bab819118 Parents: c608e99 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Dec 4 14:11:58 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Dec 4 14:11:58 2016 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 16 +++++++--- query_execution/PolicyEnforcerDistributed.cpp | 6 ++++ query_execution/PolicyEnforcerDistributed.hpp | 11 +++++-- query_optimizer/ExecutionGenerator.cpp | 3 ++ query_optimizer/QueryHandle.hpp | 37 ++++++++++++++++++++++ 5 files changed, 67 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e03cdd1/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 61f0603..0dad8b0 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p return true; } +namespace { +constexpr size_t kDefaultShiftbossIndex = 0u; +} // namespace + void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { - const size_t num_shiftbosses = shiftboss_directory_.size(); - size_t shiftboss_index = 0u; + static size_t shiftboss_index = kDefaultShiftbossIndex; + + PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get()); for (const auto &message : messages) { DCHECK(message != nullptr); const S::WorkOrderMessage &proto = *message; size_t shiftboss_index_for_particular_work_order_type; - if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { + if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) { + // Always schedule the single-node query to the same Shiftboss. + shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex; + } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { } else { // TODO(zuyu): Take data-locality into account for scheduling. @@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type); if (shiftboss_index == shiftboss_index_for_particular_work_order_type) { - shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size(); } else { // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case, // <shiftboss_index_for_particular_work_order_type> might be scheduled one http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e03cdd1/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index c5642bc..5d1deb6 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -158,6 +158,12 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb: } } +bool PolicyEnforcerDistributed::isSingleNodeQuery(const std::size_t query_id) const { + const auto cit = admitted_queries_.find(query_id); + DCHECK(cit != admitted_queries_.end()); + return cit->second->query_handle()->is_single_node_query(); +} + void PolicyEnforcerDistributed::getShiftbossIndexForAggregation( const std::size_t query_id, const QueryContext::aggregation_state_id aggr_state_index, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e03cdd1/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index e8bc394..eaab9f7 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -38,8 +38,6 @@ class CatalogDatabaseLite; class QueryHandle; class QueryManagerBase; -namespace serialization { class WorkOrderMessage; } - /** \addtogroup QueryExecution * @{ */ @@ -90,6 +88,15 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); /** + * @brief Whether the query should be executed on one Shiftboss. + * + * @param query_id The query id. + * + * @return Whether the query should be executed on one Shiftboss. + **/ + bool isSingleNodeQuery(const std::size_t query_id) const; + + /** * @brief Get or set the index of Shiftboss for an Aggregation related * WorkOrder. If it is the first Aggregation on <aggr_state_index>, * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e03cdd1/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 2e0d8f3..5a2c450 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable( void ExecutionGenerator::convertInsertTuple( const P::InsertTuplePtr &physical_plan) { // InsertTuple is converted to an Insert and a SaveBlocks. +#ifdef QUICKSTEP_DISTRIBUTED + query_handle_->set_is_single_node_query(); +#endif // QUICKSTEP_DISTRIBUTED const CatalogRelationInfo *input_relation_info = findRelationInfoOutputByPhysical(physical_plan->input()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5e03cdd1/query_optimizer/QueryHandle.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp index 1ca6021..cbd1cd9 100644 --- a/query_optimizer/QueryHandle.hpp +++ b/query_optimizer/QueryHandle.hpp @@ -26,6 +26,7 @@ #include "catalog/Catalog.pb.h" #include "query_execution/QueryContext.pb.h" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "query_optimizer/QueryPlan.hpp" #include "utility/Macros.hpp" @@ -134,6 +135,22 @@ class QueryHandle { query_result_relation_ = relation; } +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Whether the query will be executed in the single node. + */ + bool is_single_node_query() const { + return is_single_node_query_; + } + + /** + * @brief Set the query to be executed in the single node. + */ + void set_is_single_node_query() { + is_single_node_query_ = true; + } +#endif // QUICKSTEP_DISTRIBUTED + private: const std::size_t query_id_; @@ -153,6 +170,26 @@ class QueryHandle { // and deleted by the Cli shell. const CatalogRelation *query_result_relation_; +#ifdef QUICKSTEP_DISTRIBUTED + // Indicate whether the query should be executed on the default Shiftboss for + // correctness purpose. + // An example would be the insert query that might otherwise need block + // invalidation among multiple StorageManagers. In this case, an insert query + // has scheduled on node 0, and the block is in the buffer pool of node 0. + // Another insert query on the same relation might be scheduled on another + // node, say node 1, which will pull the block from node 0, and do the + // insertion. Thus, two blocks with the same block id in two nodes + // have different contents, which is incorrect. + // One approach is to evict blocks cached in all other nodes for every + // change. It, however, does not scale, and even worse, it will also affect + // the performance of each select query. + // Alternatively, we choose to mark the query as a single-node query to + // modify blocks on the default node only. But if the changed block has also + // cached in another node, this approach would still produce inconsistent + // query result. + bool is_single_node_query_ = false; +#endif // QUICKSTEP_DISTRIBUTED + DISALLOW_COPY_AND_ASSIGN(QueryHandle); };