Minor fixes for the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1325a6ae Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1325a6ae Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1325a6ae Branch: refs/heads/partitioned-aggregation Commit: 1325a6ae2c909fbadb4b0661478f42a5e6687932 Parents: 6ee9842 Author: Zuyu Zhang <zu...@twitter.com> Authored: Sat Aug 13 23:22:41 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Sat Aug 13 23:22:41 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 16 +++++++------- query_execution/PolicyEnforcerDistributed.cpp | 10 ++++----- query_execution/PolicyEnforcerDistributed.hpp | 6 +++--- query_execution/QueryExecutionTypedefs.hpp | 4 ++-- query_execution/Shiftboss.cpp | 20 +++++++++++++++--- query_execution/Shiftboss.hpp | 22 +++++++++++++------- .../tests/execution_generator/CMakeLists.txt | 2 +- 7 files changed, 51 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 74fcafb..4033594 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -31,7 +31,7 @@ endif() add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp) if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) -endif() +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) @@ -52,12 +52,12 @@ add_library(quickstep_queryexecution_QueryExecutionUtil ../empty_src.cpp QueryEx add_library(quickstep_queryexecution_QueryManagerBase QueryManagerBase.cpp QueryManagerBase.hpp) if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_QueryManagerDistributed QueryManagerDistributed.cpp QueryManagerDistributed.hpp) -endif() +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_QueryManagerSingleNode QueryManagerSingleNode.cpp QueryManagerSingleNode.hpp) if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_Shiftboss Shiftboss.cpp Shiftboss.hpp) add_library(quickstep_queryexecution_ShiftbossDirectory ../empty_src.cpp ShiftbossDirectory.hpp) -endif() +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_WorkOrderProtosContainer ../empty_src.cpp WorkOrderProtosContainer.hpp) add_library(quickstep_queryexecution_WorkOrdersContainer WorkOrdersContainer.cpp WorkOrdersContainer.hpp) add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp) @@ -80,7 +80,7 @@ if (ENABLE_DISTRIBUTED) quickstep_threading_ThreadUtil quickstep_utility_Macros tmb) -endif() +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ForemanBase glog quickstep_threading_Thread @@ -223,7 +223,7 @@ if (ENABLE_DISTRIBUTED) quickstep_utility_DAG quickstep_utility_Macros tmb) -endif() +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode quickstep_catalog_CatalogTypedefs quickstep_queryexecution_QueryContext @@ -262,7 +262,7 @@ if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ShiftbossDirectory quickstep_utility_Macros tmb) -endif() +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_WorkOrderProtosContainer glog quickstep_relationaloperators_WorkOrder_proto @@ -320,7 +320,7 @@ if (ENABLE_DISTRIBUTED) quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss quickstep_queryexecution_ShiftbossDirectory) -endif() +endif(ENABLE_DISTRIBUTED) # Tests: if (ENABLE_DISTRIBUTED) @@ -346,7 +346,7 @@ if (ENABLE_DISTRIBUTED) tmb ${LIBS}) add_test(BlockLocator_unittest BlockLocator_unittest) -endif() +endif(ENABLE_DISTRIBUTED) add_executable(QueryManagerSingleNode_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManagerSingleNode_unittest.cpp") http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index c76a9e1..47491ed 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -58,16 +58,16 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that" " can be allocated in a single round of dispatch of messages to" " the workers."); -void PolicyEnforcerDistributed::getWorkOrderMessages( - vector<unique_ptr<S::WorkOrderMessage>> *work_order_messages) { +void PolicyEnforcerDistributed::getWorkOrderProtoMessages( + vector<unique_ptr<S::WorkOrderMessage>> *work_order_proto_messages) { // Iterate over admitted queries until either there are no more // messages available, or the maximum number of messages have // been collected. - DCHECK(work_order_messages->empty()); + DCHECK(work_order_proto_messages->empty()); // TODO(harshad) - Make this function generic enough so that it // works well when multiple queries are getting executed. if (admitted_queries_.empty()) { - LOG(WARNING) << "Requesting WorkerMessages when no query is running"; + LOG(WARNING) << "Requesting WorkOrderProtoMessages when no query is running"; return; } @@ -86,7 +86,7 @@ void PolicyEnforcerDistributed::getWorkOrderMessages( static_cast<QueryManagerDistributed*>(curr_query_manager)->getNextWorkOrderMessage(0); if (next_work_order_message != nullptr) { ++messages_collected_curr_query; - work_order_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message)); + work_order_proto_messages->push_back(unique_ptr<S::WorkOrderMessage>(next_work_order_message)); } else { // No more work ordes from the current query at this time. // Check if the query's execution is over. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/PolicyEnforcerDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp index 16ebe36..bce3e0c 100644 --- a/query_execution/PolicyEnforcerDistributed.hpp +++ b/query_execution/PolicyEnforcerDistributed.hpp @@ -76,10 +76,10 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase { * @brief Get work order messages to be dispatched. These messages come from * the active queries. * - * @param work_order_messages The work order messages to be dispatched. + * @param work_order_proto_messages The work order messages to be dispatched. **/ - void getWorkOrderMessages( - std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_messages); + void getWorkOrderProtoMessages( + std::vector<std::unique_ptr<serialization::WorkOrderMessage>> *work_order_proto_messages); /** * @brief Process the initiate rebuild work order response message. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index 33a93b0..bba67e3 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -63,8 +63,8 @@ using ClientIDMap = ThreadIDBasedMap<client_id, // We sort the following message types in the order of a life cycle of a query. enum QueryExecutionMessageType : message_type_id { - kAdmitRequestMessage, // Requesting a query (or queries) to be admitted, from - // the main thread to Foreman. + kAdmitRequestMessage = 0, // Requesting a query (or queries) to be admitted, from + // the main thread to Foreman. kWorkOrderMessage, // From Foreman to Worker. kWorkOrderCompleteMessage, // From Worker to Foreman. kCatalogRelationNewBlockMessage, // From InsertDestination to Foreman. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index ddfd47f..5c2c5e0 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -147,10 +147,11 @@ void Shiftboss::run() { proto.relation_id()); break; } - case kWorkOrderCompleteMessage: // Fall through. - case kRebuildWorkOrderCompleteMessage: + case kCatalogRelationNewBlockMessage: // Fall through. case kDataPipelineMessage: - case kWorkOrderFeedbackMessage: { + case kWorkOrderFeedbackMessage: + case kWorkOrderCompleteMessage: + case kRebuildWorkOrderCompleteMessage: { DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ << "') forwarded typed '" << annotated_message.tagged_message.message_type() << "' message from Worker with TMB client ID '" << annotated_message.sender @@ -165,6 +166,15 @@ void Shiftboss::run() { CHECK(send_status == MessageBus::SendStatus::kOK); break; } + case kQueryTeardownMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::QueryTeardownMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + query_contexts_.erase(proto.query_id()); + break; + } case kSaveQueryResultMessage: { const TaggedMessage &tagged_message = annotated_message.tagged_message; @@ -175,8 +185,12 @@ void Shiftboss::run() { storage_manager_->saveBlockOrBlob(proto.blocks(i)); } + // Clean up query execution states, i.e., QueryContext. + query_contexts_.erase(proto.query_id()); + serialization::SaveQueryResultResponseMessage proto_response; proto_response.set_relation_id(proto.relation_id()); + proto_response.set_cli_id(proto.cli_id()); const size_t proto_response_length = proto_response.ByteSize(); char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 30a8d1a..94b10a2 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -21,6 +21,7 @@ #define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_ #include <cstddef> +#include <cstdint> #include <memory> #include <unordered_map> @@ -97,27 +98,34 @@ class Shiftboss : public Thread { bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage); + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage); + // Message sent to Worker. bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage); - // Message sent to Foreman. - bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); - // Forward the following message types from Foreman to Workers. bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage); // Forward the following message types from Workers to Foreman. + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); + + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage); + bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage); bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage); + // Clean up query execution states, i.e., QueryContext. + bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage); // Stop itself. bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1325a6ae/query_optimizer/tests/execution_generator/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt index 1980980..0c00ff6 100644 --- a/query_optimizer/tests/execution_generator/CMakeLists.txt +++ b/query_optimizer/tests/execution_generator/CMakeLists.txt @@ -83,4 +83,4 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Join) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Select) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/StringPatternMatching) file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator) -file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update) \ No newline at end of file +file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)