Added PolicyEnforcer implementation for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3c2749ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3c2749ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3c2749ea Branch: refs/heads/partitioned-aggregation Commit: 3c2749eafcff25283915bdef21822056d36f5281 Parents: e443b2b Author: Zuyu Zhang <zu...@twitter.com> Authored: Thu Aug 4 11:45:51 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Mon Aug 8 10:35:45 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 24 ++ query_execution/PolicyEnforcerBase.cpp | 2 + query_execution/PolicyEnforcerBase.hpp | 7 + query_execution/PolicyEnforcerDistributed.cpp | 279 +++++++++++++++++++++ query_execution/PolicyEnforcerDistributed.hpp | 113 +++++++++ query_execution/QueryExecutionMessages.proto | 16 +- query_execution/QueryExecutionTypedefs.hpp | 5 + query_execution/QueryManagerBase.cpp | 3 +- query_execution/QueryManagerBase.hpp | 11 +- 9 files changed, 456 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 4b180e3..74fcafb 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -35,6 +35,9 @@ endif() add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_PolicyEnforcerDistributed PolicyEnforcerDistributed.cpp PolicyEnforcerDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_PolicyEnforcerSingleNode PolicyEnforcerSingleNode.cpp PolicyEnforcerSingleNode.hpp) add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp) add_library(quickstep_queryexecution_QueryContext_proto @@ -110,6 +113,26 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcerBase quickstep_storage_StorageBlockInfo quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_PolicyEnforcerDistributed + glog + quickstep_catalog_CatalogRelation + quickstep_catalog_Catalog_proto + quickstep_queryexecution_PolicyEnforcerBase + quickstep_queryexecution_QueryContext_proto + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionState + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_QueryManagerBase + quickstep_queryexecution_QueryManagerDistributed + quickstep_queryexecution_ShiftbossDirectory + quickstep_queryoptimizer_QueryHandle + quickstep_storage_StorageBlockInfo + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_PolicyEnforcerSingleNode glog quickstep_catalog_CatalogTypedefs @@ -293,6 +316,7 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss quickstep_queryexecution_ShiftbossDirectory) http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp index bf6edf9..78f7b44 100644 --- a/query_execution/PolicyEnforcerBase.cpp +++ b/query_execution/PolicyEnforcerBase.cpp @@ -136,6 +136,8 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) { } if (admitted_queries_[query_id]->queryStatus(op_index) == QueryManagerBase::QueryStatusCode::kQueryExecuted) { + onQueryCompletion(admitted_queries_[query_id].get()); + removeQuery(query_id); if (!waiting_queries_.empty()) { // Admit the earliest waiting query. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerBase.hpp b/query_execution/PolicyEnforcerBase.hpp index c75a531..e95799e 100644 --- a/query_execution/PolicyEnforcerBase.hpp +++ b/query_execution/PolicyEnforcerBase.hpp @@ -140,6 +140,13 @@ class PolicyEnforcerBase { static constexpr std::size_t kMaxConcurrentQueries = 1; /** + * @brief Add custom actions upon the completion of a query. + * + * @param query_manager The query manager. + **/ + virtual void onQueryCompletion(QueryManagerBase *query_manager) {} + + /** * @brief Record the execution time for a finished WorkOrder. * * TODO(harshad) - Extend the functionality to rebuild work orders. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp new file mode 100644 index 0000000..6d0de47 --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -0,0 +1,279 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/PolicyEnforcerDistributed.hpp" + +#include <cstddef> +#include <cstdlib> +#include <memory> +#include <queue> +#include <utility> +#include <unordered_map> +#include <vector> + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogRelation.hpp" +#include "query_execution/QueryContext.pb.h" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionState.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/QueryManagerBase.hpp" +#include "query_execution/QueryManagerDistributed.hpp" +#include "query_optimizer/QueryHandle.hpp" +#include "storage/StorageBlockInfo.hpp" + +#include "gflags/gflags.h" +#include "glog/logging.h" + +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/tagged_message.h" + +using std::free; +using std::malloc; +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::TaggedMessage; + +namespace quickstep { + +namespace S = serialization; + +DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" + " can be allocated in a single round of dispatch of messages to" + " the workers."); + +void PolicyEnforcerDistributed::getWorkOrderMessages( + vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) { + // Iterate over admitted queries until either there are no more + // messages available, or the maximum number of messages have + // been collected. + DCHECK(work_order_messages->empty()); + // TODO(harshad) - Make this function generic enough so that it + // works well when multiple queries are getting executed. + if (admitted_queries_.empty()) { + LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + return; + } + + const std::size_t per_query_share = + FLAGS_max_msgs_per_dispatch_round / admitted_queries_.size(); + DCHECK_GT(per_query_share, 0u); + + vector<std::size_t> finished_queries_ids; + + for (const auto &admitted_query_info : admitted_queries_) { + QueryManagerBase *curr_query_manager = admitted_query_info.second.get(); + DCHECK(curr_query_manager != nullptr); + std::size_t messages_collected_curr_query = 0; + while (messages_collected_curr_query < per_query_share) { + S::WorkOrderMessage *next_work_order_message = + static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0); + if (next_work_order_message != nullptr) { + ++messages_collected_curr_query; + work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message)); + } else { + // No more work ordes from the current query at this time. + // Check if the query's execution is over. + if (curr_query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + // If the query has been executed, remove it. + finished_queries_ids.push_back(admitted_query_info.first); + } + break; + } + } + } + for (const std::size_t finished_qid : finished_queries_ids) { + onQueryCompletion(admitted_queries_[finished_qid].get()); + removeQuery(finished_qid); + } +} + +bool PolicyEnforcerDistributed::admitQuery(QueryHandle *query_handle) { + if (admitted_queries_.size() < PolicyEnforcerBase::kMaxConcurrentQueries) { + // Ok to admit the query. + const std::size_t query_id = query_handle->query_id(); + if (admitted_queries_.find(query_id) == admitted_queries_.end()) { + // NOTE(zuyu): Should call before constructing a 'QueryManager'. + // Otherwise, an InitiateRebuildMessage may be sent before 'QueryContext' + // initializes. + initiateQueryInShiftboss(query_handle); + + // Query with the same ID not present, ok to admit. + admitted_queries_[query_id].reset( + new QueryManagerDistributed(query_handle, shiftboss_directory_, foreman_client_id_, bus_)); + return true; + } else { + LOG(ERROR) << "Query with the same ID " << query_id << " exists"; + return false; + } + } else { + // This query will have to wait. + waiting_queries_.push(query_handle); + return false; + } +} + +void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message) { + S::InitiateRebuildResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end()); + + QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get()); + + const std::size_t num_rebuild_work_orders = proto.num_rebuild_work_orders(); + query_manager->processInitiateRebuildResponseMessage(proto.operator_index(), num_rebuild_work_orders); + shiftboss_directory_->addNumQueuedWorkOrders(proto.shiftboss_index(), num_rebuild_work_orders); + + if (query_manager->getQueryExecutionState().hasQueryExecutionFinished()) { + onQueryCompletion(query_manager); + + removeQuery(query_id); + if (!waiting_queries_.empty()) { + // Admit the earliest waiting query. + QueryHandle *new_query = waiting_queries_.front(); + waiting_queries_.pop(); + admitQuery(new_query); + } + } +} + +void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) { + S::QueryInitiateMessage proto; + proto.set_query_id(query_handle->query_id()); + proto.mutable_catalog_database_cache()->MergeFrom(query_handle->getCatalogDatabaseCacheProto()); + proto.mutable_query_context()->MergeFrom(query_handle->getQueryContextProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kQueryInitiateMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage + << "') to Shiftboss 0"; + + // TODO(zuyu): Multiple Shiftbosses support. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); + + // Wait Shiftboss for QueryInitiateResponseMessage. + const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type()); + LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::QueryInitiateResponseMessage proto_response; + CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); +} + +void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manager) { + const QueryHandle *query_handle = query_manager->query_handle(); + + const CatalogRelation *query_result = query_handle->getQueryResultRelation(); + const tmb::client_id cli_id = query_handle->getClientId(); + const std::size_t query_id = query_handle->query_id(); + + if (query_result == nullptr) { + // Clean up query execution states, i.e., QueryContext, in Shiftboss. + serialization::QueryTeardownMessage proto; + proto.set_query_id(query_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kQueryTeardownMessage); + + // TODO(zuyu): Support multiple shiftbosses. + LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage + << "') to Shiftboss 0"; + tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss"; + + TaggedMessage cli_message(kQueryExecutionSuccessMessage); + + // Notify the CLI query execution successfully. + LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(cli_message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to CLI with TMB client ID " << cli_id; + return; + } + + // NOTE(zuyu): SaveQueryResultMessage implicitly triggers QueryTeardown in Shiftboss. + S::SaveQueryResultMessage proto; + proto.set_query_id(query_id); + proto.set_relation_id(query_result->getID()); + + const vector<block_id> blocks(query_result->getBlocksSnapshot()); + for (const block_id block : blocks) { + proto.add_blocks(block); + } + + proto.set_cli_id(cli_id); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kSaveQueryResultMessage); + free(proto_bytes); + + LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage + << "') to Shiftboss 0"; + // TODO(zuyu): Support multiple shiftbosses. + const tmb::MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_directory_->getClientId(0), + move(message)); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK) + << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ + << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp new file mode 100644 index 0000000..16ebe36 --- /dev/null +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -0,0 +1,113 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ + +#include <cstddef> +#include <memory> +#include <vector> + +#include "query_execution/PolicyEnforcerBase.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { +class MessageBus; +class TaggedMessage; +} + +namespace quickstep { + +class CatalogDatabaseLite; +class QueryHandle; +class QueryManagerBase; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief A class that ensures that a high level policy is maintained + * in sharing resources among concurrent queries. + **/ +class PolicyEnforcerDistributed final : public PolicyEnforcerBase { + public: + /** + * @brief Constructor. + * + * @param foreman_client_id The TMB client ID of the Foreman. + * @param catalog_database The CatalogDatabase used. + * @param bus The TMB. + **/ + PolicyEnforcerDistributed(const tmb::client_id foreman_client_id, + CatalogDatabaseLite *catalog_database, + ShiftbossDirectory *shiftboss_directory, + tmb::MessageBus *bus, + const bool profile_individual_workorders = false) + : PolicyEnforcerBase(catalog_database, profile_individual_workorders), + foreman_client_id_(foreman_client_id), + shiftboss_directory_(shiftboss_directory), + bus_(bus) {} + + /** + * @brief Destructor. + **/ + ~PolicyEnforcerDistributed() override {} + + bool admitQuery(QueryHandle *query_handle) override; + + /** + * @brief Get work order messages to be dispatched. These messages come from + * the active queries. + * + * @param work_order_messages The work order messages to be dispatched. + **/ + void getWorkOrderMessages( + std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages); + + /** + * @brief Process the initiate rebuild work order response message. + * + * @param tagged_message The message. + **/ + void processInitiateRebuildResponseMessage(const tmb::TaggedMessage &tagged_message); + + private: + void decrementNumQueuedWorkOrders(const std::size_t shiftboss_index) override { + shiftboss_directory_->decrementNumQueuedWorkOrders(shiftboss_index); + } + + void onQueryCompletion(QueryManagerBase *query_manager) override; + + void initiateQueryInShiftboss(QueryHandle *query_handle); + + const tmb::client_id foreman_client_id_; + + ShiftbossDirectory *shiftboss_directory_; + + tmb::MessageBus *bus_; + + DISALLOW_COPY_AND_ASSIGN(PolicyEnforcerDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_DISTRIBUTED_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index f680d35..20b684e 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -120,13 +120,25 @@ message InitiateRebuildResponseMessage { required uint64 shiftboss_index = 4; } +message QueryTeardownMessage { + required uint64 query_id = 1; +} + message SaveQueryResultMessage { - required int32 relation_id = 1; - repeated fixed64 blocks = 2 [packed=true]; + required uint64 query_id = 1; + required int32 relation_id = 2; + repeated fixed64 blocks = 3 [packed=true]; + + required uint32 cli_id = 4; // tmb::client_id. } message SaveQueryResultResponseMessage { required int32 relation_id = 1; + required uint32 cli_id = 2; // tmb::client_id. +} + +message QueryExecutionSuccessMessage { + optional CatalogRelationSchema result_relation = 1; } // BlockLocator related messages. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 4643096..d154d84 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -86,9 +86,14 @@ enum QueryExecutionMessageType : message_type_id { kInitiateRebuildMessage, // From Foreman to Shiftboss. kInitiateRebuildResponseMessage, // From Shiftboss to Foreman. + kQueryTeardownMessage, // From Foreman to Shiftboss. + kSaveQueryResultMessage, // From Foreman to Shiftboss. kSaveQueryResultResponseMessage, // From Shiftboss to Foreman. + // From Foreman to CLI. + kQueryExecutionSuccessMessage, + // BlockLocator related messages, sorted in a life cycle of StorageManager // with a unique block domain. kBlockDomainRegistrationMessage, // From Worker to BlockLocator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryManagerBase.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp index c60e323..8e37da8 100644 --- a/query_execution/QueryManagerBase.cpp +++ b/query_execution/QueryManagerBase.cpp @@ -37,7 +37,8 @@ using std::pair; namespace quickstep { QueryManagerBase::QueryManagerBase(QueryHandle *query_handle) - : query_id_(DCHECK_NOTNULL(query_handle)->query_id()), + : query_handle_(DCHECK_NOTNULL(query_handle)), + query_id_(query_handle->query_id()), query_dag_(DCHECK_NOTNULL( DCHECK_NOTNULL(query_handle->getQueryPlanMutable())->getQueryPlanDAGMutable())), num_operators_in_dag_(query_dag_->size()), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3c2749ea/query_execution/QueryManagerBase.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp index 782b8ed..a274742 100644 --- a/query_execution/QueryManagerBase.hpp +++ b/query_execution/QueryManagerBase.hpp @@ -74,6 +74,13 @@ class QueryManagerBase { virtual ~QueryManagerBase() {} /** + * @brief Get the query handle. + **/ + const QueryHandle* query_handle() const { + return query_handle_; + } + + /** * @brief Get the QueryExecutionState for this query. **/ inline const QueryExecutionState& getQueryExecutionState() const { @@ -252,9 +259,11 @@ class QueryManagerBase { return query_exec_state_->hasRebuildInitiated(index); } + const QueryHandle *query_handle_; + const std::size_t query_id_; - DAG<RelationalOperator, bool> *query_dag_; + DAG<RelationalOperator, bool> *query_dag_; // Owned by 'query_handle_'. const dag_node_index num_operators_in_dag_; // For all nodes, store their receiving dependents.