http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp deleted file mode 100644 index b799d5f..0000000 --- a/query_execution/PolicyEnforcerBase.cpp +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/PolicyEnforcerBase.hpp" - -#include <cstddef> -#include <memory> -#include <queue> -#include <unordered_map> -#include <vector> - -#include "catalog/CatalogDatabase.hpp" -#include "catalog/CatalogRelation.hpp" -#include "catalog/PartitionScheme.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "relational_operators/WorkOrder.hpp" -#include "storage/StorageBlockInfo.hpp" - -#include "gflags/gflags.h" -#include "glog/logging.h" - -namespace quickstep { - -DEFINE_bool(profile_and_report_workorder_perf, false, - "If true, Quickstep will record the exceution time of all the individual " - "normal work orders and report it at the end of query execution."); - -DEFINE_bool(visualize_execution_dag, false, - "If true, visualize the execution plan DAG into a graph in DOT " - "format (DOT is a plain text graph description language) which is " - "then printed via stderr."); - -PolicyEnforcerBase::PolicyEnforcerBase(CatalogDatabaseLite *catalog_database) - : catalog_database_(catalog_database), - profile_individual_workorders_(FLAGS_profile_and_report_workorder_perf || FLAGS_visualize_execution_dag) { -} - -void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { - std::size_t query_id; - QueryManagerBase::dag_node_index op_index; - - switch (tagged_message.message_type()) { - case kWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - // Note: This proto message contains the time it took to execute the - // WorkOrder. It can be accessed in this scope. - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - decrementNumQueuedWorkOrders(proto); - - if (profile_individual_workorders_) { - recordTimeForWorkOrder(proto); - } - - query_id = proto.query_id(); - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - op_index = proto.operator_index(); - admitted_queries_[query_id]->processWorkOrderCompleteMessage(op_index); - break; - } - case kRebuildWorkOrderCompleteMessage: { - serialization::WorkOrderCompletionMessage proto; - // Note: This proto message contains the time it took to execute the - // rebuild WorkOrder. It can be accessed in this scope. - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - decrementNumQueuedWorkOrders(proto); - - query_id = proto.query_id(); - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - op_index = proto.operator_index(); - admitted_queries_[query_id]->processRebuildWorkOrderCompleteMessage(op_index); - break; - } - case kCatalogRelationNewBlockMessage: { - serialization::CatalogRelationNewBlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - - const block_id block = proto.block_id(); - - CatalogRelation *relation = - static_cast<CatalogDatabase*>(catalog_database_)->getRelationByIdMutable(proto.relation_id()); - relation->addBlock(block); - - if (proto.has_partition_id()) { - relation->getPartitionSchemeMutable()->addBlockToPartition( - proto.partition_id(), block); - } - return; - } - case kDataPipelineMessage: { - serialization::DataPipelineMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), - tagged_message.message_bytes())); - query_id = proto.query_id(); - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - op_index = proto.operator_index(); - admitted_queries_[query_id]->processDataPipelineMessage( - op_index, proto.block_id(), proto.relation_id()); - break; - } - case kWorkOrderFeedbackMessage: { - WorkOrder::FeedbackMessage msg( - const_cast<void *>(tagged_message.message()), - tagged_message.message_bytes()); - query_id = msg.header().query_id; - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - op_index = msg.header().rel_op_index; - admitted_queries_[query_id]->processFeedbackMessage(op_index, msg); - break; - } - default: - LOG(FATAL) << "Unknown message type found in PolicyEnforcer"; - } - if (admitted_queries_[query_id]->queryStatus(op_index) == - QueryManagerBase::QueryStatusCode::kQueryExecuted) { - onQueryCompletion(admitted_queries_[query_id].get()); - - removeQuery(query_id); - if (!waiting_queries_.empty()) { - // Admit the earliest waiting query. - QueryHandle *new_query = waiting_queries_.front(); - waiting_queries_.pop(); - admitQuery(new_query); - } - } -} - -void PolicyEnforcerBase::removeQuery(const std::size_t query_id) { - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - if (!admitted_queries_[query_id]->getQueryExecutionState().hasQueryExecutionFinished()) { - LOG(WARNING) << "Removing query with ID " << query_id - << " that hasn't finished its execution"; - } - admitted_queries_.erase(query_id); - - removed_query_ids_.insert(query_id); -} - -bool PolicyEnforcerBase::admitQueries( - const std::vector<QueryHandle*> &query_handles) { - for (QueryHandle *curr_query : query_handles) { - if (!admitQuery(curr_query)) { - return false; - } - } - return true; -} - -void PolicyEnforcerBase::recordTimeForWorkOrder( - const serialization::WorkOrderCompletionMessage &proto) { - const std::size_t query_id = proto.query_id(); - std::vector<WorkOrderTimeEntry> &workorder_time_entries - = workorder_time_recorder_[query_id]; - workorder_time_entries.emplace_back(); - WorkOrderTimeEntry &entry = workorder_time_entries.back(); - entry.worker_id = proto.worker_thread_index(), - entry.operator_id = proto.operator_index(), - entry.start_time = proto.execution_start_time(), - entry.end_time = proto.execution_end_time(); -} - -} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp deleted file mode 100644 index baf9c68..0000000 --- a/query_execution/PolicyEnforcerBase.hpp +++ /dev/null @@ -1,208 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_ - -#include <cstddef> -#include <memory> -#include <queue> -#include <unordered_map> -#include <unordered_set> -#include <vector> - -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -class CatalogDatabaseLite; -class QueryHandle; - -namespace serialization { class WorkOrderCompletionMessage; } - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A base class that ensures that a high level policy is maintained - * in sharing resources among concurrent queries. - **/ -class PolicyEnforcerBase { - public: - /** - * @brief Constructor. - * - * @param catalog_database The CatalogDatabase used. - **/ - explicit PolicyEnforcerBase(CatalogDatabaseLite *catalog_database); - - /** - * @brief Virtual Destructor. - **/ - virtual ~PolicyEnforcerBase() { - if (hasQueries()) { - LOG(WARNING) << "Destructing PolicyEnforcer with some unfinished or " - "waiting queries"; - } - } - - /** - * @brief Admit multiple queries in the system. - * - * @note In the current simple implementation, we only allow one active - * query in the system. Other queries will have to wait. - * - * @param query_handles A vector of QueryHandles for the queries to be - * admitted. - * - * @return True if all the queries were admitted, false if at least one query - * was not admitted. - **/ - bool admitQueries(const std::vector<QueryHandle*> &query_handles); - - /** - * @brief Remove a given query that is under execution. - * - * @note This function is made public so that it is possible for a query - * to be killed. Otherwise, it should only be used privately by the - * class. - * - * TODO(harshad) - Extend this function to support removal of waiting queries. - * - * @param query_id The ID of the query to be removed. - **/ - void removeQuery(const std::size_t query_id); - - /** - * @brief Process a message sent to the Foreman, which gets passed on to the - * policy enforcer. - * - * @param message The message. - **/ - void processMessage(const TaggedMessage &tagged_message); - - /** - * @brief Check if the given query id ever exists. - * - * @return True if the query ever exists, otherwise false. - **/ - inline bool existQuery(const std::size_t query_id) const { - return admitted_queries_.find(query_id) != admitted_queries_.end() || - removed_query_ids_.find(query_id) != removed_query_ids_.end(); - } - - /** - * @brief Check if there are any queries to be executed. - * - * @return True if there is at least one active or waiting query, false if - * the policy enforcer doesn't have any query. - **/ - inline bool hasQueries() const { - return !(admitted_queries_.empty() && waiting_queries_.empty()); - } - - /** - * @brief Get the profiling results for individual work order execution for a - * given query. - * - * @note This function should only be called if profiling individual work - * orders option is enabled. - * - * @param query_id The ID of the query for which the profiling results are - * requested. - * - * @return A vector of records, each being a single profiling entry. - **/ - inline const std::vector<WorkOrderTimeEntry>& getProfilingResults( - const std::size_t query_id) const { - DCHECK(profile_individual_workorders_); - DCHECK(workorder_time_recorder_.find(query_id) != - workorder_time_recorder_.end()); - return workorder_time_recorder_.at(query_id); - } - - /** - * @brief Admit a query to the system. - * - * @param query_handle The QueryHandle for the new query. - * - * @return Whether the query was admitted to the system. - **/ - virtual bool admitQuery(QueryHandle *query_handle) = 0; - - protected: - static constexpr std::size_t kMaxConcurrentQueries = 1; - - /** - * @brief Add custom actions upon the completion of a query. - * - * @param query_manager The query manager. - **/ - virtual void onQueryCompletion(QueryManagerBase *query_manager) {} - - /** - * @brief Record the execution time for a finished WorkOrder. - * - * TODO(harshad) - Extend the functionality to rebuild work orders. - * - * @param proto The completion message proto sent after the WorkOrder - * execution. - **/ - void recordTimeForWorkOrder( - const serialization::WorkOrderCompletionMessage &proto); - - CatalogDatabaseLite *catalog_database_; - - const bool profile_individual_workorders_; - - // Key = query ID, value = QueryManagerBase* for the key query. - std::unordered_map<std::size_t, std::unique_ptr<QueryManagerBase>> admitted_queries_; - - // TODO(quickstep-team): Delete a 'query_id' after receiving all - // 'QueryInitiateResponseMessage's for the 'query_id'. - std::unordered_set<std::size_t> removed_query_ids_; - - // The queries which haven't been admitted yet. - std::queue<QueryHandle*> waiting_queries_; - - WorkOrderTimeRecorder workorder_time_recorder_; - - private: - /** - * @brief Decrement the number of queued workorders for the given worker by 1. - * - * @param proto The completion message proto received after the WorkOrder - * execution. - **/ - virtual void decrementNumQueuedWorkOrders( - const serialization::WorkOrderCompletionMessage &proto) = 0; - - DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerBase); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_BASE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp deleted file mode 100644 index c06fd86..0000000 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#include "query_execution/PolicyEnforcerDistributed.hpp" - -#include <cstddef> -#include <cstdlib> -#include <memory> -#include <queue> -#include <utility> -#include <unordered_map> -#include <vector> - -#include "catalog/Catalog.pb.h" -#include "catalog/CatalogRelation.hpp" -#include "query_execution/QueryContext.pb.h" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryExecutionTypedefs.hpp" -#include "query_execution/QueryExecutionUtil.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "query_execution/QueryManagerDistributed.hpp" -#include "query_optimizer/QueryHandle.hpp" -#include "storage/StorageBlockInfo.hpp" - -#include "gflags/gflags.h" -#include "glog/logging.h" - -#include "tmb/address.h" -#include "tmb/id_typedefs.h" -#include "tmb/message_bus.h" -#include "tmb/tagged_message.h" - -using std::free; -using std::malloc; -using std::move; -using std::size_t; -using std::unique_ptr; -using std::vector; - -using tmb::TaggedMessage; - -namespace quickstep { - -namespace S = serialization; - -DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" - " can be allocated in a single round of dispatch of messages to" - " the workers."); - -void PolicyEnforcerDistributed::getWorkOrderProtoMessages( - vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) { - // Iterate over admitted queries until either there are no more - // messages available, or the maximum number of messages have - // been collected. - DCHECK(work_order_proto_messages->empty()); - // TODO(harshad) - Make this function generic enough so that it - // works well when multiple queries are getting executed. - if (admitted_queries_.empty()) { - LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; - return; - } - - const std::size_t per_query_share = - FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); - DCHECK_GT(per_query_share, 0u); - - vector<std::size_t> finished_queries_ids; - - for (const auto &admitted_query_info : admitted_queries_) { - QueryManagerBase *curr_query_manager = admitted_query_info.second.get(); - DCHECK(curr_query_manager != nullptr); - std::size_t messages_collected_curr_query = 0; - while (messages_collected_curr_query < per_query_share) { - S::WorkOrderMessage *next_work_order_message = - static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0); - if (next_work_order_message != nullptr) { - ++messages_collected_curr_query; - work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message)); - } else { - // No more work ordes from the current query at this time. - // Check if the query's execution is over. - if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { - // If the query has been executed, remove it. - finished_queries_ids.push_back(admitted_query_info.first); - } - break; - } - } - } - for (const std::size_t finished_qid : finished_queries_ids) { - onQueryCompletion(admitted_queries_[finished_qid].get()); - removeQuery(finished_qid); - } -} - -bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { - if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { - // Ok to admit the query. - const std::size_t query_id = query_handle->query_id(); - if (admitted_queries_.find(query_id) == admitted_queries_.end()) { - // NOTE(zuyu): Should call before constructing a 'QueryManager'. - // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' - // initializes. - initiateQueryInShiftboss(query_handle); - - // Query with the same ID not present, ok to admit. - admitted_queries_[query_id].reset( - new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); - return true; - } else { - LOG(ERROR) << "Query with the same ID " << query_id << " exists"; - return false; - } - } else { - // This query will have to wait. - waiting_queries_.push(query_handle); - return false; - } -} - -void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { - S::InitiateRebuildResponseMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const std::size_t query_id = proto.query_id(); - DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); - - QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); - - const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); - query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders); - shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); - - if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { - onQueryCompletion(query_manager); - - removeQuery(query_id); - if (!waiting_queries_.empty()) { - // Admit the earliest waiting query. - QueryHandle *new_query = waiting_queries_.front(); - waiting_queries_.pop(); - admitQuery(new_query); - } - } -} - -void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) { - S::QueryInitiateMessage proto; - proto.set_query_id(query_handle->query_id()); - proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto()); - proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto()); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryInitiateMessage); - free(proto_bytes); - - // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. - tmb::Address shiftboss_addresses; - for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) { - shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); - } - - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage - << "') to all Shiftbosses"; - QueryExecutionUtil::BroadcastMessage(foreman_client_id_, - shiftboss_addresses, - move(message), - bus_); -} - -void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) { - const QueryHandle *query_handle = query_manager->query_handle(); - - const CatalogRelation *query_result = query_handle->getQueryResultRelation(); - const tmb::client_id cli_id = query_handle->getClientId(); - const std::size_t query_id = query_handle->query_id(); - - // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. - tmb::Address shiftboss_addresses; - for (std::size_t i = 0; i < shiftboss_directory_->size(); ++i) { - shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); - } - - if (query_result == nullptr) { - // Clean up query execution states, i.e., QueryContext, in Shiftbosses. - serialization::QueryTeardownMessage proto; - proto.set_query_id(query_id); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kQueryTeardownMessage); - - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage - << "') to all Shiftbosses"; - QueryExecutionUtil::BroadcastMessage(foreman_client_id_, - shiftboss_addresses, - move(message), - bus_); - - TaggedMessage cli_message(kQueryExecutionSuccessMessage); - - // Notify the CLI query execution successfully. - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" - << kQueryExecutionSuccessMessage - << "') to CLI with TMB client id " << cli_id; - const tmb::MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - foreman_client_id_, - cli_id, - move(cli_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK); - return; - } - - // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss. - S::SaveQueryResultMessage proto; - proto.set_query_id(query_id); - proto.set_relation_id(query_result->getID()); - - const vector<block_id> blocks(query_result->getBlocksSnapshot()); - for (const block_id block : blocks) { - proto.add_blocks(block); - } - - proto.set_cli_id(cli_id); - - const size_t proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kSaveQueryResultMessage); - free(proto_bytes); - - // TODO(quickstep-team): Dynamically scale-up/down Shiftbosses. - DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage - << "') to all Shiftbosses"; - QueryExecutionUtil::BroadcastMessage(foreman_client_id_, - shiftboss_addresses, - move(message), - bus_); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp deleted file mode 100644 index 146e9af..0000000 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ - -#include <cstddef> -#include <memory> -#include <vector> - -#include "query_execution/PolicyEnforcerBase.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/ShiftbossDirectory.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { -class MessageBus; -class TaggedMessage; -} - -namespace quickstep { - -class CatalogDatabaseLite; -class QueryHandle; -class QueryManagerBase; - -namespace serialization { class WorkOrderMessage; } - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class that ensures that a high level policy is maintained - * in sharing resources among concurrent queries. - **/ -class PolicyEnforcerDistributed final : public PolicyEnforcerBase { - public: - /** - * @brief Constructor. - * - * @param foreman_client_id The TMB client ID of the Foreman. - * @param catalog_database The CatalogDatabase used. - * @param bus The TMB. - **/ - PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, - CatalogDatabaseLite *catalog_database, - ShiftbossDirectory *shiftboss_directory, - tmb::MessageBus *bus) - : PolicyEnforcerBase(catalog_database), - foreman_client_id_(foreman_client_id), - shiftboss_directory_(shiftboss_directory), - bus_(bus) {} - - /** - * @brief Destructor. - **/ - ~PolicyEnforcerDistributed() override {} - - bool admitQuery(QueryHandle *query_handle) override; - - /** - * @brief Get work order messages to be dispatched. These messages come from - * the active queries. - * - * @param work_order_proto_messages The work order messages to be dispatched. - **/ - void getWorkOrderProtoMessages( - std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages); - - /** - * @brief Process the initiate rebuild work order response message. - * - * @param tagged_message The message. - **/ - void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); - - private: - void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override { - shiftboss_directory_->decrementNumQueuedWorkOrders(proto.shiftboss_index()); - } - - void onQueryCompletion(QueryManagerBase *query_manager) override; - - void initiateQueryInShiftboss(QueryHandle *query_handle); - - const tmb::client_id foreman_client_id_; - - ShiftbossDirectory *shiftboss_directory_; - - tmb::MessageBus *bus_; - - DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerSingleNode.cpp b/query_execution/PolicyEnforcerSingleNode.cpp deleted file mode 100644 index 0aa2ca8..0000000 --- a/query_execution/PolicyEnforcerSingleNode.cpp +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/PolicyEnforcerSingleNode.hpp" - -#include <cstddef> -#include <memory> -#include <queue> -#include <utility> -#include <unordered_map> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "query_execution/QueryExecutionState.hpp" -#include "query_execution/QueryManagerBase.hpp" -#include "query_execution/QueryManagerSingleNode.hpp" -#include "query_execution/WorkerDirectory.hpp" -#include "query_execution/WorkerMessage.hpp" -#include "query_optimizer/QueryHandle.hpp" - -#include "gflags/gflags.h" -#include "glog/logging.h" - -namespace quickstep { - -DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" - " can be allocated in a single round of dispatch of messages to" - " the workers."); - -void PolicyEnforcerSingleNode::getWorkerMessages( - std::vector<std::unique_ptr<WorkerMessage>> *worker_messages) { - // Iterate over admitted queries until either there are no more - // messages available, or the maximum number of messages have - // been collected. - DCHECK(worker_messages->empty()); - // TODO(harshad) - Make this function generic enough so that it - // works well when multiple queries are getting executed. - std::size_t per_query_share = 0; - if (!admitted_queries_.empty()) { - per_query_share = FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); - } else { - LOG(WARNING) << "Requesting WorkerMessages when no query is running"; - return; - } - DCHECK_GT(per_query_share, 0u); - std::vector<std::size_t> finished_queries_ids; - - for (const auto &admitted_query_info : admitted_queries_) { - QueryManagerBase *curr_query_manager = admitted_query_info.second.get(); - DCHECK(curr_query_manager != nullptr); - std::size_t messages_collected_curr_query = 0; - while (messages_collected_curr_query < per_query_share) { - WorkerMessage *next_worker_message = - static_cast<QueryManagerSingleNode*>(curr_query_manager)->getNextWorkerMessage(0, kAnyNUMANodeID); - if (next_worker_message != nullptr) { - ++messages_collected_curr_query; - worker_messages->push_back(std::unique_ptr<WorkerMessage>(next_worker_message)); - } else { - // No more work ordes from the current query at this time. - // Check if the query's execution is over. - if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { - // If the query has been executed, remove it. - finished_queries_ids.push_back(admitted_query_info.first); - } - break; - } - } - } - for (const std::size_t finished_qid : finished_queries_ids) { - removeQuery(finished_qid); - } -} - -bool PolicyEnforcerSingleNode::admitQuery(QueryHandle *query_handle) { - if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { - // Ok to admit the query. - const std::size_t query_id = query_handle->query_id(); - if (admitted_queries_.find(query_id) == admitted_queries_.end()) { - // Query with the same ID not present, ok to admit. - admitted_queries_[query_id].reset( - new QueryManagerSingleNode(foreman_client_id_, num_numa_nodes_, query_handle, - catalog_database_, storage_manager_, bus_)); - return true; - } else { - LOG(ERROR) << "Query with the same ID " << query_id << " exists"; - return false; - } - } else { - // This query will have to wait. - waiting_queries_.push(query_handle); - return false; - } -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/PolicyEnforcerSingleNode.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerSingleNode.hpp b/query_execution/PolicyEnforcerSingleNode.hpp deleted file mode 100644 index f87d670..0000000 --- a/query_execution/PolicyEnforcerSingleNode.hpp +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_ - -#include <cstddef> -#include <memory> -#include <vector> - -#include "query_execution/PolicyEnforcerBase.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" -#include "query_execution/WorkerDirectory.hpp" -#include "utility/Macros.hpp" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class CatalogDatabaseLite; -class QueryHandle; -class StorageManager; -class WorkerMessage; - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class that ensures that a high level policy is maintained - * in sharing resources among concurrent queries. - **/ -class PolicyEnforcerSingleNode final : public PolicyEnforcerBase { - public: - /** - * @brief Constructor. - * - * @param foreman_client_id The TMB client ID of the Foreman. - * @param num_numa_nodes Number of NUMA nodes used by the system. - * @param catalog_database The CatalogDatabase used. - * @param storage_manager The StorageManager used. - * @param bus The TMB. - **/ - PolicyEnforcerSingleNode(const tmb::client_id foreman_client_id, - const std::size_t num_numa_nodes, - CatalogDatabaseLite *catalog_database, - StorageManager *storage_manager, - WorkerDirectory *worker_directory, - tmb::MessageBus *bus) - : PolicyEnforcerBase(catalog_database), - foreman_client_id_(foreman_client_id), - num_numa_nodes_(num_numa_nodes), - storage_manager_(storage_manager), - worker_directory_(worker_directory), - bus_(bus) {} - - /** - * @brief Destructor. - **/ - ~PolicyEnforcerSingleNode() override {} - - bool admitQuery(QueryHandle *query_handle) override; - - /** - * @brief Get worker messages to be dispatched. These worker messages come - * from the active queries. - * - * @param worker_messages The worker messages to be dispatched. - **/ - void getWorkerMessages( - std::vector<std::unique_ptr<WorkerMessage>> *worker_messages); - - private: - void decrementNumQueuedWorkOrders(const serialization::WorkOrderCompletionMessage &proto) override { - worker_directory_->decrementNumQueuedWorkOrders(proto.worker_thread_index()); - } - - const tmb::client_id foreman_client_id_; - const std::size_t num_numa_nodes_; - - StorageManager *storage_manager_; - WorkerDirectory *worker_directory_; - - tmb::MessageBus *bus_; - - DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerSingleNode); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_SINGLE_NODE_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp deleted file mode 100644 index 0e6636d..0000000 --- a/query_execution/QueryContext.cpp +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#include "query_execution/QueryContext.hpp" - -#include <memory> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "catalog/CatalogDatabaseLite.hpp" -#include "catalog/CatalogRelationSchema.hpp" -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/ExpressionFactories.hpp" -#include "expressions/table_generator/GeneratorFunction.pb.h" -#include "expressions/table_generator/GeneratorFunctionFactory.hpp" -#include "expressions/table_generator/GeneratorFunctionHandle.hpp" -#include "query_execution/QueryContext.pb.h" -#include "storage/AggregationOperationState.hpp" -#include "storage/HashTable.hpp" -#include "storage/HashTableFactory.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/InsertDestination.pb.h" -#include "types/TypedValue.hpp" -#include "types/containers/Tuple.hpp" -#include "utility/SortConfiguration.hpp" -#include "utility/lip_filter/LIPFilter.hpp" -#include "utility/lip_filter/LIPFilterDeployment.hpp" -#include "utility/lip_filter/LIPFilterFactory.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" - -using std::move; -using std::unique_ptr; -using std::vector; - -namespace quickstep { - -QueryContext::QueryContext(const serialization::QueryContext &proto, - const CatalogDatabaseLite &database, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus) { - DCHECK(ProtoIsValid(proto, database)) - << "Attempted to create QueryContext from an invalid proto description:\n" - << proto.DebugString(); - - for (int i = 0; i < proto.aggregation_states_size(); ++i) { - aggregation_states_.emplace_back( - AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i), - database, - storage_manager)); - } - - for (int i = 0; i < proto.generator_functions_size(); ++i) { - const GeneratorFunctionHandle *func_handle = - GeneratorFunctionFactory::Instance().reconstructFromProto(proto.generator_functions(i)); - DCHECK(func_handle != nullptr); - generator_functions_.emplace_back( - std::unique_ptr<const GeneratorFunctionHandle>(func_handle)); - } - - for (int i = 0; i < proto.join_hash_tables_size(); ++i) { - join_hash_tables_.emplace_back( - JoinHashTableFactory::CreateResizableFromProto(proto.join_hash_tables(i), - storage_manager)); - } - - for (int i = 0; i < proto.insert_destinations_size(); ++i) { - const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i); - insert_destinations_.emplace_back(InsertDestination::ReconstructFromProto( - proto.query_id(), - insert_destination_proto, - database.getRelationSchemaById(insert_destination_proto.relation_id()), - storage_manager, - scheduler_client_id, - bus)); - } - - for (int i = 0; i < proto.lip_filters_size(); ++i) { - lip_filters_.emplace_back( - std::unique_ptr<LIPFilter>( - LIPFilterFactory::ReconstructFromProto(proto.lip_filters(i)))); - } - - for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) { - lip_deployments_.emplace_back( - std::make_unique<LIPFilterDeployment>( - proto.lip_filter_deployments(i), lip_filters_)); - } - - for (int i = 0; i < proto.predicates_size(); ++i) { - predicates_.emplace_back( - PredicateFactory::ReconstructFromProto(proto.predicates(i), database)); - } - - for (int i = 0; i < proto.scalar_groups_size(); ++i) { - vector<unique_ptr<const Scalar>> scalar_group; - - const serialization::QueryContext::ScalarGroup &scalar_group_proto = proto.scalar_groups(i); - for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) { - scalar_group.emplace_back( - ScalarFactory::ReconstructFromProto(scalar_group_proto.scalars(j), database)); - } - - scalar_groups_.push_back(move(scalar_group)); - } - - for (int i = 0; i < proto.sort_configs_size(); ++i) { - sort_configs_.emplace_back( - SortConfiguration::ReconstructFromProto(proto.sort_configs(i), database)); - } - - for (int i = 0; i < proto.tuples_size(); ++i) { - tuples_.emplace_back(Tuple::ReconstructFromProto(proto.tuples(i))); - } - - for (int i = 0; i < proto.update_groups_size(); ++i) { - const serialization::QueryContext::UpdateGroup &update_group_proto = proto.update_groups(i); - - std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> update_group; - for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) { - const serialization::QueryContext::UpdateGroup::UpdateAssignment &update_assignment_proto = - update_group_proto.update_assignments(j); - - unique_ptr<const Scalar> scalar( - ScalarFactory::ReconstructFromProto(update_assignment_proto.scalar(), database)); - - update_group.emplace(update_assignment_proto.attribute_id(), move(scalar)); - } - - update_groups_.push_back(move(update_group)); - } - - for (int i = 0; i < proto.window_aggregation_states_size(); ++i) { - window_aggregation_states_.emplace_back( - WindowAggregationOperationState::ReconstructFromProto(proto.window_aggregation_states(i), - database, - storage_manager)); - } -} - -bool QueryContext::ProtoIsValid(const serialization::QueryContext &proto, - const CatalogDatabaseLite &database) { - for (int i = 0; i < proto.aggregation_states_size(); ++i) { - if (!AggregationOperationState::ProtoIsValid(proto.aggregation_states(i), database)) { - return false; - } - } - - // Each GeneratorFunctionHandle object is serialized as a function name with - // a list of arguments. Here checks that the arguments are valid TypedValue's. - for (int i = 0; i < proto.generator_functions_size(); ++i) { - const serialization::GeneratorFunctionHandle &func_proto = proto.generator_functions(i); - for (int j = 0; j < func_proto.args_size(); ++j) { - if (!TypedValue::ProtoIsValid(func_proto.args(j))) { - return false; - } - } - } - - for (int i = 0; i < proto.join_hash_tables_size(); ++i) { - if (!JoinHashTableFactory::ProtoIsValid(proto.join_hash_tables(i))) { - return false; - } - } - - for (int i = 0; i < proto.insert_destinations_size(); ++i) { - const serialization::InsertDestination &insert_destination_proto = proto.insert_destinations(i); - const relation_id rel_id = insert_destination_proto.relation_id(); - - if (!database.hasRelationWithId(rel_id) || - !InsertDestination::ProtoIsValid(insert_destination_proto, - database.getRelationSchemaById(rel_id))) { - return false; - } - } - - for (int i = 0; i < proto.lip_filters_size(); ++i) { - if (!LIPFilterFactory::ProtoIsValid(proto.lip_filters(i))) { - return false; - } - } - - for (int i = 0; i < proto.lip_filter_deployments_size(); ++i) { - if (!LIPFilterDeployment::ProtoIsValid(proto.lip_filter_deployments(i))) { - return false; - } - } - - for (int i = 0; i < proto.predicates_size(); ++i) { - if (!PredicateFactory::ProtoIsValid(proto.predicates(i), database)) { - return false; - } - } - - for (int i = 0; i < proto.scalar_groups_size(); ++i) { - const serialization::QueryContext::ScalarGroup &scalar_group_proto = proto.scalar_groups(i); - for (int j = 0; j < scalar_group_proto.scalars_size(); ++j) { - if (!ScalarFactory::ProtoIsValid(scalar_group_proto.scalars(j), database)) { - return false; - } - } - } - - for (int i = 0; i < proto.sort_configs_size(); ++i) { - if (!SortConfiguration::ProtoIsValid(proto.sort_configs(i), database)) { - return false; - } - } - - for (int i = 0; i < proto.tuples_size(); ++i) { - if (!Tuple::ProtoIsValid(proto.tuples(i))) { - return false; - } - } - - for (int i = 0; i < proto.update_groups_size(); ++i) { - const serialization::QueryContext::UpdateGroup &update_group_proto = proto.update_groups(i); - - const relation_id rel_id = update_group_proto.relation_id(); - if (!database.hasRelationWithId(rel_id)) { - return false; - } - const CatalogRelationSchema &rel = database.getRelationSchemaById(rel_id); - - for (int j = 0; j < update_group_proto.update_assignments_size(); ++j) { - const serialization::QueryContext::UpdateGroup::UpdateAssignment &update_assignment_proto = - update_group_proto.update_assignments(j); - - if (!rel.hasAttributeWithId(update_assignment_proto.attribute_id()) || - !ScalarFactory::ProtoIsValid(update_assignment_proto.scalar(), database)) { - return false; - } - } - } - - for (int i = 0; i < proto.window_aggregation_states_size(); ++i) { - if (!WindowAggregationOperationState::ProtoIsValid(proto.window_aggregation_states(i), - database)) { - return false; - } - } - - return proto.IsInitialized(); -} - -} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp deleted file mode 100644 index 7ad8fa1..0000000 --- a/query_execution/QueryContext.hpp +++ /dev/null @@ -1,585 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_ - -#include <cstddef> -#include <cstdint> -#include <memory> -#include <unordered_map> -#include <vector> - -#include "catalog/CatalogTypedefs.hpp" -#include "expressions/predicate/Predicate.hpp" -#include "expressions/scalar/Scalar.hpp" -#include "expressions/table_generator/GeneratorFunctionHandle.hpp" -#include "storage/AggregationOperationState.hpp" -#include "storage/HashTable.hpp" -#include "storage/InsertDestination.hpp" -#include "storage/WindowAggregationOperationState.hpp" -#include "types/containers/Tuple.hpp" -#include "utility/Macros.hpp" -#include "utility/SortConfiguration.hpp" -#include "utility/lip_filter/LIPFilter.hpp" -#include "utility/lip_filter/LIPFilterDeployment.hpp" - -#include "glog/logging.h" - -#include "tmb/id_typedefs.h" - -namespace tmb { class MessageBus; } - -namespace quickstep { - -class CatalogDatabaseLite; -class StorageManager; - -namespace serialization { class QueryContext; } - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief The QueryContext stores stateful execution info per query. - **/ -class QueryContext { - public: - /** - * @brief A unique identifier for an AggregationOperationState per query. - **/ - typedef std::uint32_t aggregation_state_id; - - /** - * @brief A unique identifier for a GeneratorFunctionHandle per query. - **/ - typedef std::uint32_t generator_function_id; - - /** - * @brief A unique identifier for an InsertDestination per query. - * - * @note A negative value indicates a nonexistent InsertDestination. - **/ - typedef std::int32_t insert_destination_id; - static constexpr insert_destination_id kInvalidInsertDestinationId = static_cast<insert_destination_id>(-1); - - /** - * @brief A unique identifier for a JoinHashTable per query. - **/ - typedef std::uint32_t join_hash_table_id; - - /** - * @brief A unique identifier for a LIPFilterDeployment per query. - **/ - typedef std::int32_t lip_deployment_id; - static constexpr lip_deployment_id kInvalidLIPDeploymentId = static_cast<lip_deployment_id>(-1); - - /** - * @brief A unique identifier for a LIPFilter per query. - **/ - typedef std::uint32_t lip_filter_id; - - /** - * @brief A unique identifier for a Predicate per query. - * - * @note A negative value indicates a null Predicate. - **/ - typedef std::int32_t predicate_id; - static constexpr predicate_id kInvalidPredicateId = static_cast<predicate_id>(-1); - - /** - * @brief A unique identifier for a group of Scalars per query. - * - * @note A negative value indicates a nonexistent ScalarGroup. - **/ - typedef std::int32_t scalar_group_id; - static constexpr scalar_group_id kInvalidScalarGroupId = static_cast<scalar_group_id>(-1); - - /** - * @brief A unique identifier for a SortConfiguration per query. - **/ - typedef std::uint32_t sort_config_id; - - /** - * @brief A unique identifier for a Tuple to be inserted per query. - **/ - typedef std::uint32_t tuple_id; - - /** - * @brief A unique identifier for a group of UpdateAssignments per query. - **/ - typedef std::uint32_t update_group_id; - - /** - * @brief A unique identifier for a window aggregation state. - **/ - typedef std::uint32_t window_aggregation_state_id; - - /** - * @brief Constructor. - * - * @param proto A serialized Protocol Buffer representation of a - * QueryContext, originally generated by the optimizer. - * @param database The Database to resolve relation and attribute references - * in. - * @param storage_manager The StorageManager to use. - * @param scheduler_client_id The TMB client ID of the scheduler thread. - * @param bus A pointer to the TMB. - **/ - QueryContext(const serialization::QueryContext &proto, - const CatalogDatabaseLite &database, - StorageManager *storage_manager, - const tmb::client_id scheduler_client_id, - tmb::MessageBus *bus); - - ~QueryContext() {} - - /** - * @brief Check whether a serialization::QueryContext is fully-formed and - * all parts are valid. - * - * @param proto A serialized Protocol Buffer representation of a QueryContext, - * originally generated by the optimizer. - * @param database The Database to resolve relation and attribute references - * in. - * @return Whether proto is fully-formed and valid. - **/ - static bool ProtoIsValid(const serialization::QueryContext &proto, - const CatalogDatabaseLite &database); - - /** - * @brief Whether the given AggregationOperationState id is valid. - * - * @param id The AggregationOperationState id. - * - * @return True if valid, otherwise false. - **/ - bool isValidAggregationStateId(const aggregation_state_id id) const { - return id < aggregation_states_.size(); - } - - /** - * @brief Get the AggregationOperationState. - * - * @param id The AggregationOperationState id in the query. - * - * @return The AggregationOperationState, alreadly created in the constructor. - **/ - inline AggregationOperationState* getAggregationState(const aggregation_state_id id) { - DCHECK_LT(id, aggregation_states_.size()); - DCHECK(aggregation_states_[id]); - return aggregation_states_[id].get(); - } - - /** - * @brief Destroy the given aggregation state. - * - * @param id The ID of the AggregationOperationState to destroy. - **/ - inline void destroyAggregationState(const aggregation_state_id id) { - DCHECK_LT(id, aggregation_states_.size()); - DCHECK(aggregation_states_[id]); - aggregation_states_[id].reset(nullptr); - } - - /** - * @brief Destroy the payloads from the aggregation hash tables. - * - * @warning After calling these methods, the hash table will be in an invalid - * state. No other operation should be performed on them. - * - * @param id The ID of the AggregationOperationState. - **/ - inline void destroyAggregationHashTablePayload(const aggregation_state_id id) { - DCHECK_LT(id, aggregation_states_.size()); - DCHECK(aggregation_states_[id]); - aggregation_states_[id]->destroyAggregationHashTablePayload(); - } - - /** - * @brief Whether the given GeneratorFunctionHandle id is valid. - * - * @param id The GeneratorFunctionHandle id. - * - * @return True if valid, otherwise false. - **/ - bool isValidGeneratorFunctionId(const generator_function_id id) const { - return id < generator_functions_.size(); - } - - /** - * @brief Get the GeneratorFunctionHandle. - * - * @param id The GeneratorFunctionHandle id in the query. - * - * @return The GeneratorFunctionHandle, alreadly created in the constructor. - **/ - inline const GeneratorFunctionHandle& getGeneratorFunctionHandle( - const generator_function_id id) { - DCHECK_LT(static_cast<std::size_t>(id), generator_functions_.size()); - return *generator_functions_[id]; - } - - /** - * @brief Whether the given InsertDestination id is valid. - * - * @param id The InsertDestination id. - * - * @return True if valid, otherwise false. - **/ - bool isValidInsertDestinationId(const insert_destination_id id) const { - return id != kInvalidInsertDestinationId - && id >= 0 - && static_cast<std::size_t>(id) < insert_destinations_.size(); - } - - /** - * @brief Get the InsertDestination. - * - * @param id The InsertDestination id in the query. - * - * @return The InsertDestination, alreadly created in the constructor. - **/ - inline InsertDestination* getInsertDestination(const insert_destination_id id) { - DCHECK_GE(id, 0); - DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size()); - return insert_destinations_[id].get(); - } - - /** - * @brief Destory the given InsertDestination. - * - * @param id The id of the InsertDestination to destroy. - **/ - inline void destroyInsertDestination(const insert_destination_id id) { - DCHECK_GE(id, 0); - DCHECK_LT(static_cast<std::size_t>(id), insert_destinations_.size()); - insert_destinations_[id].reset(); - } - - /** - * @brief Whether the given JoinHashTable id is valid. - * - * @param id The JoinHashTable id. - * - * @return True if valid, otherwise false. - **/ - bool isValidJoinHashTableId(const join_hash_table_id id) const { - return id < join_hash_tables_.size(); - } - - /** - * @brief Get the JoinHashTable. - * - * @param id The JoinHashTable id in the query. - * - * @return The JoinHashTable, already created in the constructor. - **/ - inline JoinHashTable* getJoinHashTable(const join_hash_table_id id) { - DCHECK_LT(id, join_hash_tables_.size()); - return join_hash_tables_[id].get(); - } - - /** - * @brief Destory the given JoinHashTable. - * - * @param id The id of the JoinHashTable to destroy. - **/ - inline void destroyJoinHashTable(const join_hash_table_id id) { - DCHECK_LT(id, join_hash_tables_.size()); - join_hash_tables_[id].reset(); - } - - /** - * @brief Whether the given LIPFilterDeployment id is valid. - * - * @param id The LIPFilterDeployment id. - * - * @return True if valid, otherwise false. - **/ - bool isValidLIPDeploymentId(const lip_deployment_id id) const { - return static_cast<std::size_t>(id) < lip_deployments_.size(); - } - - /** - * @brief Get a constant pointer to the LIPFilterDeployment. - * - * @param id The LIPFilterDeployment id. - * - * @return The constant pointer to LIPFilterDeployment that is - * already created in the constructor. - **/ - inline const LIPFilterDeployment* getLIPDeployment( - const lip_deployment_id id) const { - DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size()); - return lip_deployments_[id].get(); - } - - /** - * @brief Destory the given LIPFilterDeployment. - * - * @param id The id of the LIPFilterDeployment to destroy. - **/ - inline void destroyLIPDeployment(const lip_deployment_id id) { - DCHECK_LT(static_cast<std::size_t>(id), lip_deployments_.size()); - lip_deployments_[id].reset(); - } - - /** - * @brief Whether the given LIPFilter id is valid. - * - * @param id The LIPFilter id. - * - * @return True if valid, otherwise false. - **/ - bool isValidLIPFilterId(const lip_filter_id id) const { - return id < lip_filters_.size(); - } - - /** - * @brief Get a mutable reference to the LIPFilter. - * - * @param id The LIPFilter id. - * - * @return The LIPFilter, already created in the constructor. - **/ - inline LIPFilter* getLIPFilterMutable(const lip_filter_id id) { - DCHECK_LT(id, lip_filters_.size()); - return lip_filters_[id].get(); - } - - /** - * @brief Get a constant pointer to the LIPFilter. - * - * @param id The LIPFilter id. - * - * @return The constant pointer to LIPFilter that is - * already created in the constructor. - **/ - inline const LIPFilter* getLIPFilter(const lip_filter_id id) const { - DCHECK_LT(id, lip_filters_.size()); - return lip_filters_[id].get(); - } - - /** - * @brief Destory the given LIPFilter. - * - * @param id The id of the LIPFilter to destroy. - **/ - inline void destroyLIPFilter(const lip_filter_id id) { - DCHECK_LT(id, lip_filters_.size()); - lip_filters_[id].reset(); - } - - /** - * @brief Whether the given Predicate id is valid or no predicate. - * - * @param id The Predicate id. - * - * @return True if valid or no predicate, otherwise false. - **/ - bool isValidPredicate(const predicate_id id) const { - return (id == kInvalidPredicateId) // No predicate. - || (id >= 0 && static_cast<std::size_t>(id) < predicates_.size()); - } - - /** - * @brief Get the const Predicate. - * - * @param id The Predicate id in the query. - * - * @return The const Predicate (alreadly created in the constructor), or - * nullptr for the given invalid id. - **/ - inline const Predicate* getPredicate(const predicate_id id) { - if (id == kInvalidPredicateId) { - return nullptr; - } - - DCHECK_GE(id, 0); - DCHECK_LT(static_cast<std::size_t>(id), predicates_.size()); - return predicates_[id].get(); - } - - /** - * @brief Whether the given Scalar group id is valid. - * - * @param id The Scalar group id. - * - * @return True if valid, otherwise false. - **/ - bool isValidScalarGroupId(const scalar_group_id id) const { - return id != kInvalidScalarGroupId - && id >= 0 - && static_cast<std::size_t>(id) < scalar_groups_.size(); - } - - /** - * @brief Get the group of Scalars. - * - * @param id The Scalar group id in the query. - * - * @return The reference to the Scalar group, alreadly created in the - * constructor. - **/ - inline const std::vector<std::unique_ptr<const Scalar>>& getScalarGroup(const scalar_group_id id) { - DCHECK_GE(id, 0); - DCHECK_LT(static_cast<std::size_t>(id), scalar_groups_.size()); - return scalar_groups_[id]; - } - - /** - * @brief Whether the given SortConfiguration id is valid. - * - * @param id The SortConfiguration id. - * - * @return True if valid, otherwise false. - **/ - bool isValidSortConfigId(const sort_config_id id) const { - return id < sort_configs_.size(); - } - - /** - * @brief Get the SortConfiguration. - * - * @param id The SortConfiguration id in the query. - * - * @return The SortConfiguration, alreadly created in the constructor. - **/ - inline const SortConfiguration& getSortConfig(const sort_config_id id) { - DCHECK_LT(id, sort_configs_.size()); - return *sort_configs_[id]; - } - - /** - * @brief Whether the given Tuple id is valid. - * - * @param id The Tuple id. - * - * @return True if valid, otherwise false. - **/ - bool isValidTupleId(const tuple_id id) const { - return id < tuples_.size(); - } - - /** - * @brief Release the ownership of the Tuple referenced by the id. - * - * @note Each id should use only once. - * - * @param id The Tuple id in the query. - * - * @return The Tuple, alreadly created in the constructor. - **/ - inline Tuple* releaseTuple(const tuple_id id) { - DCHECK_LT(id, tuples_.size()); - DCHECK(tuples_[id]); - return tuples_[id].release(); - } - - /** - * @brief Whether the given update assignments group id is valid. - * - * @param id The group id of the update assignments. - * - * @return True if valid, otherwise false. - **/ - bool isValidUpdateGroupId(const update_group_id id) const { - return static_cast<std::size_t>(id) < update_groups_.size(); - } - - /** - * @brief Get the group of the update assignments for UpdateWorkOrder. - * - * @param id The group id of the update assignments in the query. - * - * @return The reference to the update assignments group, alreadly created in the - * constructor. - **/ - inline const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>& getUpdateGroup( - const update_group_id id) { - DCHECK_LT(static_cast<std::size_t>(id), update_groups_.size()); - DCHECK(!update_groups_[id].empty()); - return update_groups_[id]; - } - - /** - * @brief Whether the given WindowAggregationOperationState id is valid. - * - * @param id The WindowAggregationOperationState id. - * - * @return True if valid, otherwise false. - **/ - bool isValidWindowAggregationStateId(const window_aggregation_state_id id) const { - return id < window_aggregation_states_.size(); - } - - /** - * @brief Get the WindowAggregationOperationState. - * - * @param id The WindowAggregationOperationState id in the query. - * - * @return The WindowAggregationOperationState, already created in the - * constructor. - **/ - inline WindowAggregationOperationState* getWindowAggregationState( - const window_aggregation_state_id id) { - DCHECK_LT(id, window_aggregation_states_.size()); - DCHECK(window_aggregation_states_[id]); - return window_aggregation_states_[id].get(); - } - - /** - * @brief Release the given WindowAggregationOperationState. - * - * @param id The id of the WindowAggregationOperationState to destroy. - * - * @return The WindowAggregationOperationState, already created in the - * constructor. - **/ - inline WindowAggregationOperationState* releaseWindowAggregationState( - const window_aggregation_state_id id) { - DCHECK_LT(id, window_aggregation_states_.size()); - DCHECK(window_aggregation_states_[id]); - return window_aggregation_states_[id].release(); - } - - private: - std::vector<std::unique_ptr<AggregationOperationState>> aggregation_states_; - std::vector<std::unique_ptr<const GeneratorFunctionHandle>> generator_functions_; - std::vector<std::unique_ptr<InsertDestination>> insert_destinations_; - std::vector<std::unique_ptr<JoinHashTable>> join_hash_tables_; - std::vector<std::unique_ptr<LIPFilterDeployment>> lip_deployments_; - std::vector<std::unique_ptr<LIPFilter>> lip_filters_; - std::vector<std::unique_ptr<const Predicate>> predicates_; - std::vector<std::vector<std::unique_ptr<const Scalar>>> scalar_groups_; - std::vector<std::unique_ptr<const SortConfiguration>> sort_configs_; - std::vector<std::unique_ptr<Tuple>> tuples_; - std::vector<std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>> update_groups_; - std::vector<std::unique_ptr<WindowAggregationOperationState>> window_aggregation_states_; - - DISALLOW_COPY_AND_ASSIGN(QueryContext); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_CONTEXT_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryContext.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto deleted file mode 100644 index ab0f520..0000000 --- a/query_execution/QueryContext.proto +++ /dev/null @@ -1,65 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -syntax = "proto2"; - -package quickstep.serialization; - -import "expressions/Expressions.proto"; -import "expressions/table_generator/GeneratorFunction.proto"; -import "storage/AggregationOperationState.proto"; -import "storage/HashTable.proto"; -import "storage/InsertDestination.proto"; -import "storage/WindowAggregationOperationState.proto"; -import "types/containers/Tuple.proto"; -import "utility/SortConfiguration.proto"; -import "utility/lip_filter/LIPFilter.proto"; - -message QueryContext { - message ScalarGroup { - repeated Scalar scalars = 1; - } - - message UpdateGroup { - message UpdateAssignment { - required int32 attribute_id = 1; - required Scalar scalar = 2; - } - - // NOTE(zuyu): Only used for validating UpdateAssignment's attribute_id. - required int32 relation_id = 1; - repeated UpdateAssignment update_assignments = 2; - } - - repeated AggregationOperationState aggregation_states = 1; - repeated GeneratorFunctionHandle generator_functions = 2; - repeated HashTable join_hash_tables = 3; - repeated InsertDestination insert_destinations = 4; - repeated LIPFilter lip_filters = 5; - repeated LIPFilterDeployment lip_filter_deployments = 6; - repeated Predicate predicates = 7; - repeated ScalarGroup scalar_groups = 8; - repeated SortConfiguration sort_configs = 9; - repeated Tuple tuples = 10; - - // NOTE(zuyu): For UpdateWorkOrder only. - repeated UpdateGroup update_groups = 11; - - repeated WindowAggregationOperationState window_aggregation_states = 12; - - required uint64 query_id = 13; -} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto deleted file mode 100644 index e6d741a..0000000 --- a/query_execution/QueryExecutionMessages.proto +++ /dev/null @@ -1,162 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -syntax = "proto2"; - -package quickstep.serialization; - -import "catalog/Catalog.proto"; -import "query_execution/QueryContext.proto"; -import "relational_operators/WorkOrder.proto"; - -// Note: There are different types of completion messages for normal work orders -// rebuild work orders. This can be potentially helpful when we want to collect -// different statistics for executing different types of work orders. -// e.g. In select normal work order completion message, we could be interested -// in the selectivity of the block whose work order got execute. In rebuild work -// order completion message, we may be interested in adding the compression -// ratio or dictionary size of the rebuilt block. - -message WorkOrderCompletionMessage { - enum WorkOrderType { - NORMAL = 0; - REBUILD = 1; - } - - required WorkOrderType work_order_type = 1; - - required uint64 operator_index = 2; - required uint64 worker_thread_index = 3; - required uint64 query_id = 4; - - // Epoch time in microseconds. - optional uint64 execution_start_time = 5; - optional uint64 execution_end_time = 6; - - // Required in the distributed version. - optional uint64 shiftboss_index = 7; -} - -message CatalogRelationNewBlockMessage { - required int32 relation_id = 1; - required fixed64 block_id = 2; - - // Used by PartitionAwareInsertDestination. - optional uint64 partition_id = 3; - required uint64 query_id = 4; -} - -message DataPipelineMessage { - required uint64 operator_index = 1; - required fixed64 block_id = 2; - required int32 relation_id = 3; - required uint64 query_id = 4; -} - -// Distributed version related messages. -message ShiftbossRegistrationMessage { - // The total Work Order processing capacity in Shiftboss, which equals to the - // sum of the capacity of each worker managed by Shiftboss. - required uint64 work_order_capacity = 1; -} - -message ShiftbossRegistrationResponseMessage { - required uint64 shiftboss_index = 1; -} - -message QueryInitiateMessage { - required uint64 query_id = 1; - required CatalogDatabase catalog_database_cache = 2; - required QueryContext query_context = 3; -} - -message QueryInitiateResponseMessage { - required uint64 query_id = 1; -} - -message WorkOrderMessage { - required uint64 query_id = 1; - required uint64 operator_index = 2; - required WorkOrder work_order = 3; -} - -message InitiateRebuildMessage { - required uint64 query_id = 1; - required uint64 operator_index = 2; - required uint64 insert_destination_index = 3; - required int32 relation_id = 4; -} - -message InitiateRebuildResponseMessage { - required uint64 query_id = 1; - required uint64 operator_index = 2; - required uint64 num_rebuild_work_orders = 3; - required uint64 shiftboss_index = 4; -} - -message QueryTeardownMessage { - required uint64 query_id = 1; -} - -message SaveQueryResultMessage { - required uint64 query_id = 1; - required int32 relation_id = 2; - repeated fixed64 blocks = 3 [packed=true]; - - required uint32 cli_id = 4; // tmb::client_id. -} - -message SaveQueryResultResponseMessage { - required uint64 query_id = 1; - required int32 relation_id = 2; - required uint32 cli_id = 3; // tmb::client_id. - required uint64 shiftboss_index = 4; -} - -message QueryExecutionSuccessMessage { - optional CatalogRelationSchema result_relation = 1; -} - -// BlockLocator related messages. -message BlockDomainRegistrationMessage { - // Format IP:Port, i.e., "0.0.0.0:0". - required string domain_network_address = 1; -} - -// Used for RegistrationResponse, Unregistration, and FailureReport. -message BlockDomainMessage { - required uint32 block_domain = 1; -} - -// Used when StorageManager loads or evicts a block or a blob from its buffer -// pool. -message BlockLocationMessage { - required fixed64 block_id = 1; - required uint32 block_domain = 2; -} - -message BlockMessage { - required fixed64 block_id = 1; -} - -message LocateBlockResponseMessage { - repeated uint32 block_domains = 1; -} - -message GetPeerDomainNetworkAddressesResponseMessage { - repeated string domain_network_addresses = 1; -} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionModule.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionModule.hpp b/query_execution/QueryExecutionModule.hpp deleted file mode 100644 index 89979f1..0000000 --- a/query_execution/QueryExecutionModule.hpp +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -/** @defgroup QueryExecution - * - * The components of query execution including Workers, Foreman (co-ordinator), - * message classes for queues and for inter thread communication. -**/ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/156290a4/query_execution/QueryExecutionState.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp deleted file mode 100644 index f5281d5..0000000 --- a/query_execution/QueryExecutionState.hpp +++ /dev/null @@ -1,329 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - **/ - -#ifndef QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_ -#define QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_ - -#include <cstddef> -#include <unordered_map> -#include <utility> -#include <vector> - -#include "utility/Macros.hpp" - -#include "glog/logging.h" - -namespace quickstep { - -/** \addtogroup QueryExecution - * @{ - */ - -/** - * @brief A class that tracks the state of the execution of a query which - * includes status of operators, number of dispatched work orders etc. - **/ -class QueryExecutionState { - public: - /** - * @brief Constructor. - * - * @param num_operators Number of relational operators in the query. - **/ - explicit QueryExecutionState(const std::size_t num_operators) - : num_operators_(num_operators), - num_operators_finished_(0), - queued_workorders_per_op_(num_operators, 0), - rebuild_required_(num_operators, false), - done_gen_(num_operators, false), - execution_finished_(num_operators, false) {} - - /** - * @brief Get the number of operators in the query. - **/ - inline const std::size_t getNumOperators() const { - return num_operators_; - } - - /** - * @brief Get the number of operators who have finished their execution. - **/ - inline const std::size_t getNumOperatorsFinished() const { - return num_operators_finished_; - } - - /** - * @brief Check if the query has finished its execution. - * - * @return True if the query has finished its execution, false otherwise. - **/ - inline bool hasQueryExecutionFinished() const { - return num_operators_finished_ == num_operators_; - } - - /** - * @brief Set the rebuild status of the given operator that includes the - * flag for whether the rebuild has been initiated and if so, the - * number of pending rebuild work orders. - * - * @param operator_index The index of the given operator. - * @param num_rebuild_workorders The number of rebuild workorders of the given - * operator. - * @param rebuild_initiated True if the rebuild has been initiated, false - * otherwise. - **/ - inline void setRebuildStatus(const std::size_t operator_index, - const std::size_t num_rebuild_workorders, - const bool rebuild_initiated) { - DCHECK(operator_index < num_operators_); - auto search_res = rebuild_status_.find(operator_index); - if (search_res != rebuild_status_.end()) { - search_res->second.has_initiated = rebuild_initiated; - search_res->second.num_pending_workorders = num_rebuild_workorders; - } else { - RebuildStatus rebuild_status(rebuild_initiated, num_rebuild_workorders); - - rebuild_status_.emplace(operator_index, std::move(rebuild_status)); - } - } - - /** - * @brief Check if the rebuild has been initiated for the given operator. - * - * @param operator_index The index of the given operator. - * - * @return True if the rebuild has been initiated, false otherwise. - **/ - inline bool hasRebuildInitiated(const std::size_t operator_index) const { - DCHECK(operator_index < num_operators_); - const auto search_res = rebuild_status_.find(operator_index); - if (search_res != rebuild_status_.end()) { - return search_res->second.has_initiated; - } - return false; - } - - /** - * @brief Get the number of pending rebuild workorders for the given operator. - * - * @param operator_index The index of the given operator. - * - * @return The number of pending rebuild workorders for the given operator. - **/ - inline const std::size_t getNumRebuildWorkOrders( - const std::size_t operator_index) const { - DCHECK(operator_index < num_operators_); - const auto search_res = rebuild_status_.find(operator_index); - if (search_res != rebuild_status_.end()) { - return search_res->second.num_pending_workorders; - } - LOG(WARNING) << "Called QueryExecutionState::getNumRebuildWorkOrders() " - "for an operator whose rebuild entry doesn't exist."; - return 0; - } - - /** - * @brief Increment the number of rebuild WorkOrders for the given operator. - * - * @param operator_index The index of the given operator. - * @param num_rebuild_workorders The number of rebuild workorders of the given - * operator. - **/ - inline void incrementNumRebuildWorkOrders(const std::size_t operator_index, - const std::size_t num_rebuild_workorders) { - DCHECK_LT(operator_index, num_operators_); - auto search_res = rebuild_status_.find(operator_index); - DCHECK(search_res != rebuild_status_.end()) - << "Called for an operator whose rebuild status does not exist."; - DCHECK(search_res->second.has_initiated); - - search_res->second.num_pending_workorders += num_rebuild_workorders; - } - - /** - * @brief Decrement the number of rebuild WorkOrders for the given operator. - * - * @param operator_index The index of the given operator. - **/ - inline void decrementNumRebuildWorkOrders(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - auto search_res = rebuild_status_.find(operator_index); - CHECK(search_res != rebuild_status_.end()) - << "Called QueryExecutionState::decrementNumRebuildWorkOrders() for an " - "operator whose rebuild entry doesn't exist."; - - DCHECK(search_res->second.has_initiated); - DCHECK_GE(search_res->second.num_pending_workorders, 1u); - - --(search_res->second.num_pending_workorders); - } - - /** - * @brief Increment the number of queued (normal) WorkOrders for the given - * operator. - * - * @param operator_index The index of the given operator. - **/ - inline void incrementNumQueuedWorkOrders(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - ++queued_workorders_per_op_[operator_index]; - } - - /** - * @brief Decrement the number of queued (normal) WorkOrders for the given - * operator. - * - * @param operator_index The index of the given operator. - **/ - inline void decrementNumQueuedWorkOrders(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - DCHECK_GT(queued_workorders_per_op_[operator_index], 0u); - --queued_workorders_per_op_[operator_index]; - } - - /** - * @brief Get the number of queued (normal) WorkOrders for the given operator. - * - * @note Queued WorkOrders mean those WorkOrders which have been dispatched - * for execution by the Foreman and haven't yet completed. These are - * different from pending WorkOrders which mean the WorkOrders that - * haven't been dispatched for execution yet. - * - * @param operator_index The index of the given operator. - * - * @return The number of queued (normal) WorkOrders for the given operators. - **/ - inline const std::size_t getNumQueuedWorkOrders( - const std::size_t operator_index) const { - DCHECK(operator_index < num_operators_); - return queued_workorders_per_op_[operator_index]; - } - - /** - * @brief Set the rebuild required flag as true for the given operator. - * - * @param operator_index The index of the given operator. - **/ - inline void setRebuildRequired(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - rebuild_required_[operator_index] = true; - } - - /** - * @brief Get the rebuild required flag for the given operator. - * - * @param operator_index The index of the given operator. - **/ - inline bool isRebuildRequired(const std::size_t operator_index) const { - DCHECK(operator_index < num_operators_); - return rebuild_required_[operator_index]; - } - - /** - * @brief Set the execution finished flag for the given operator as true. - * - * @note By default this flag is false. - * - * @param operator_index The index of the given operator. - **/ - inline void setExecutionFinished(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - execution_finished_[operator_index] = true; - ++num_operators_finished_; - } - - /** - * @brief Get the execution finished flag for the given operator. - * - * @param operator_index The index of the given operator. - **/ - inline bool hasExecutionFinished(const std::size_t operator_index) const { - DCHECK(operator_index < num_operators_); - return execution_finished_[operator_index]; - } - - /** - * @brief Set the "done generation of workorders" flag as true for the given - * operator. - * - * @note By default this flag is false. - * - * @param operator_index The index of the given operator. - **/ - inline void setDoneGenerationWorkOrders(const std::size_t operator_index) { - DCHECK(operator_index < num_operators_); - done_gen_[operator_index] = true; - } - - /** - * @brief Get the "done generation of workorders" flag for the given operator. - * - * @param operator_index The index of the given operator. - **/ - inline bool hasDoneGenerationWorkOrders(const std::size_t operator_index) - const { - DCHECK(operator_index < num_operators_); - return done_gen_[operator_index]; - } - - private: - // Total number of operators in the query. - const std::size_t num_operators_; - - // Number of operators who've finished their execution. - std::size_t num_operators_finished_; - - // A vector to track the number of normal WorkOrders in execution. - std::vector<std::size_t> queued_workorders_per_op_; - - // The ith bit denotes if the operator with ID = i requires generation of - // rebuild WorkOrders. - std::vector<bool> rebuild_required_; - - // The ith bit denotes if the operator with ID = i has finished generating - // work orders. - std::vector<bool> done_gen_; - - // The ith bit denotes if the operator with ID = i has finished its execution. - std::vector<bool> execution_finished_; - - struct RebuildStatus { - RebuildStatus(const bool initiated, - const std::size_t num_workorders) - : has_initiated(initiated), - num_pending_workorders(num_workorders) {} - - // Whether rebuild for operator at index i has been initiated. - bool has_initiated; - // The number of pending rebuild workorders for the operator. - // Valid if and only if 'has_initiated' is true. - std::size_t num_pending_workorders; - }; - - // Key is dag_node_index for which rebuild is required. - std::unordered_map<std::size_t, RebuildStatus> rebuild_status_; - - DISALLOW_COPY_AND_ASSIGN(QueryExecutionState); -}; - -/** @} */ - -} // namespace quickstep - -#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_EXECUTION_STATE_HPP_