Marked SingleNodeQuery for Insertions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0859a17a Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0859a17a Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0859a17a Branch: refs/heads/quickstep_partition_parser_support Commit: 0859a17aa4e71ef8d3d261f15e52518b39f617f6 Parents: e50a2b7 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Dec 4 14:11:58 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Dec 4 15:44:24 2016 -0800 ---------------------------------------------------------------------- query_execution/ForemanDistributed.cpp | 16 +++++++--- query_execution/PolicyEnforcerDistributed.hpp | 23 +++++++++++--- query_optimizer/ExecutionGenerator.cpp | 3 ++ query_optimizer/QueryHandle.hpp | 37 ++++++++++++++++++++++ 4 files changed, 71 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index 61f0603..0dad8b0 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -295,14 +295,22 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p return true; } +namespace { +constexpr size_t kDefaultShiftbossIndex = 0u; +} // namespace + void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { - const size_t num_shiftbosses = shiftboss_directory_.size(); - size_t shiftboss_index = 0u; + static size_t shiftboss_index = kDefaultShiftbossIndex; + + PolicyEnforcerDistributed* policy_enforcer_dist = static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get()); for (const auto &message : messages) { DCHECK(message != nullptr); const S::WorkOrderMessage &proto = *message; size_t shiftboss_index_for_particular_work_order_type; - if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { + if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) { + // Always schedule the single-node query to the same Shiftboss. + shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex; + } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) { } else { // TODO(zuyu): Take data-locality into account for scheduling. @@ -313,7 +321,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo shiftboss_directory_.incrementNumQueuedWorkOrders(shiftboss_index_for_particular_work_order_type); if (shiftboss_index == shiftboss_index_for_particular_work_order_type) { - shiftboss_index = (shiftboss_index + 1) % num_shiftbosses; + shiftboss_index = (shiftboss_index + 1) % shiftboss_directory_.size(); } else { // NOTE(zuyu): This is not the exact round-robin scheduling, as in this case, // <shiftboss_index_for_particular_work_order_type> might be scheduled one http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index e8bc394..2c00a6b 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -17,14 +17,20 @@ #include <cstddef> #include <memory> +#include <unordered_map> +#include <utility> #include <vector> #include "query_execution/PolicyEnforcerBase.hpp" #include "query_execution/QueryContext.hpp" #include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryManagerBase.hpp" #include "query_execution/ShiftbossDirectory.hpp" +#include "query_optimizer/QueryHandle.hpp" #include "utility/Macros.hpp" +#include "glog/logging.h" + #include "tmb/id_typedefs.h" namespace tmb { @@ -35,10 +41,6 @@ class TaggedMessage; namespace quickstep { class CatalogDatabaseLite; -class QueryHandle; -class QueryManagerBase; - -namespace serialization { class WorkOrderMessage; } /** \addtogroup QueryExecution * @{ @@ -90,6 +92,19 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); /** + * @brief Whether the query should be executed on one Shiftboss. + * + * @param query_id The query id. + * + * @return Whether the query should be executed on one Shiftboss. + **/ + bool isSingleNodeQuery(const std::size_t query_id) const { + const auto cit = admitted_queries_.find(query_id); + DCHECK(cit != admitted_queries_.end()); + return cit->second->query_handle()->is_single_node_query(); + } + + /** * @brief Get or set the index of Shiftboss for an Aggregation related * WorkOrder. If it is the first Aggregation on <aggr_state_index>, * <shiftboss_index> will be set to <next_shiftboss_index_to_schedule>. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/ExecutionGenerator.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp index 2e0d8f3..5a2c450 100644 --- a/query_optimizer/ExecutionGenerator.cpp +++ b/query_optimizer/ExecutionGenerator.cpp @@ -1096,6 +1096,9 @@ void ExecutionGenerator::convertDropTable( void ExecutionGenerator::convertInsertTuple( const P::InsertTuplePtr &physical_plan) { // InsertTuple is converted to an Insert and a SaveBlocks. +#ifdef QUICKSTEP_DISTRIBUTED + query_handle_->set_is_single_node_query(); +#endif // QUICKSTEP_DISTRIBUTED const CatalogRelationInfo *input_relation_info = findRelationInfoOutputByPhysical(physical_plan->input()); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0859a17a/query_optimizer/QueryHandle.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/QueryHandle.hpp b/query_optimizer/QueryHandle.hpp index 1ca6021..cbd1cd9 100644 --- a/query_optimizer/QueryHandle.hpp +++ b/query_optimizer/QueryHandle.hpp @@ -26,6 +26,7 @@ #include "catalog/Catalog.pb.h" #include "query_execution/QueryContext.pb.h" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "query_optimizer/QueryPlan.hpp" #include "utility/Macros.hpp" @@ -134,6 +135,22 @@ class QueryHandle { query_result_relation_ = relation; } +#ifdef QUICKSTEP_DISTRIBUTED + /** + * @brief Whether the query will be executed in the single node. + */ + bool is_single_node_query() const { + return is_single_node_query_; + } + + /** + * @brief Set the query to be executed in the single node. + */ + void set_is_single_node_query() { + is_single_node_query_ = true; + } +#endif // QUICKSTEP_DISTRIBUTED + private: const std::size_t query_id_; @@ -153,6 +170,26 @@ class QueryHandle { // and deleted by the Cli shell. const CatalogRelation *query_result_relation_; +#ifdef QUICKSTEP_DISTRIBUTED + // Indicate whether the query should be executed on the default Shiftboss for + // correctness purpose. + // An example would be the insert query that might otherwise need block + // invalidation among multiple StorageManagers. In this case, an insert query + // has scheduled on node 0, and the block is in the buffer pool of node 0. + // Another insert query on the same relation might be scheduled on another + // node, say node 1, which will pull the block from node 0, and do the + // insertion. Thus, two blocks with the same block id in two nodes + // have different contents, which is incorrect. + // One approach is to evict blocks cached in all other nodes for every + // change. It, however, does not scale, and even worse, it will also affect + // the performance of each select query. + // Alternatively, we choose to mark the query as a single-node query to + // modify blocks on the default node only. But if the changed block has also + // cached in another node, this approach would still produce inconsistent + // query result. + bool is_single_node_query_ = false; +#endif // QUICKSTEP_DISTRIBUTED + DISALLOW_COPY_AND_ASSIGN(QueryHandle); };