Added ForemanDistributed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/203d3ea6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/203d3ea6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/203d3ea6 Branch: refs/heads/quickstep-28-29 Commit: 203d3ea66e4c1f72f7edc858b5b243ae9db33eba Parents: 1325a6a Author: Zuyu Zhang <zu...@twitter.com> Authored: Sat Aug 13 23:37:59 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Mon Aug 15 13:48:32 2016 -0700 ---------------------------------------------------------------------- query_execution/CMakeLists.txt | 24 ++ query_execution/ForemanDistributed.cpp | 335 ++++++++++++++++++++++++++++ query_execution/ForemanDistributed.hpp | 130 +++++++++++ 3 files changed, 489 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index 4033594..1b27194 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -33,6 +33,9 @@ if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) +if (ENABLE_DISTRIBUTED) + add_library(quickstep_queryexecution_ForemanDistributed ForemanDistributed.cpp ForemanDistributed.hpp) +endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanSingleNode ForemanSingleNode.cpp ForemanSingleNode.hpp) add_library(quickstep_queryexecution_PolicyEnforcerBase PolicyEnforcerBase.cpp PolicyEnforcerBase.hpp) if (ENABLE_DISTRIBUTED) @@ -86,6 +89,26 @@ target_link_libraries(quickstep_queryexecution_ForemanBase quickstep_threading_Thread quickstep_utility_Macros tmb) +if (ENABLE_DISTRIBUTED) + target_link_libraries(quickstep_queryexecution_ForemanDistributed + glog + quickstep_catalog_CatalogDatabase + quickstep_catalog_CatalogRelation + quickstep_catalog_CatalogTypedefs + quickstep_catalog_Catalog_proto + quickstep_queryexecution_AdmitRequestMessage + quickstep_queryexecution_ForemanBase + quickstep_queryexecution_PolicyEnforcerDistributed + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_queryexecution_QueryExecutionUtil + quickstep_queryexecution_ShiftbossDirectory + quickstep_threading_ThreadUtil + quickstep_utility_EqualsAnyConstant + quickstep_utility_Macros + tmb + ${GFLAGS_LIB_NAME}) +endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ForemanSingleNode glog quickstep_queryexecution_AdmitRequestMessage @@ -316,6 +339,7 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_ForemanDistributed quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed quickstep_queryexecution_Shiftboss http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp new file mode 100644 index 0000000..29f5b9b --- /dev/null +++ b/query_execution/ForemanDistributed.cpp @@ -0,0 +1,335 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "query_execution/ForemanDistributed.hpp" + +#include <cstddef> +#include <cstdio> +#include <cstdlib> +#include <memory> +#include <utility> +#include <vector> + +#include "catalog/Catalog.pb.h" +#include "catalog/CatalogDatabase.hpp" +#include "catalog/CatalogRelation.hpp" +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/AdmitRequestMessage.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_execution/QueryExecutionUtil.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "threading/ThreadUtil.hpp" +#include "utility/EqualsAnyConstant.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/message_style.h" +#include "tmb/tagged_message.h" + +using std::move; +using std::size_t; +using std::unique_ptr; +using std::vector; + +using tmb::AnnotatedMessage; +using tmb::MessageBus; +using tmb::TaggedMessage; +using tmb::client_id; + +namespace quickstep { + +namespace S = serialization; + +class QueryHandle; + +ForemanDistributed::ForemanDistributed( + MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id, + const bool profile_individual_workorders) + : ForemanBase(bus, cpu_id), + catalog_database_(DCHECK_NOTNULL(catalog_database)) { + const std::vector<QueryExecutionMessageType> sender_message_types{ + kShiftbossRegistrationResponseMessage, + kQueryInitiateMessage, + kWorkOrderMessage, + kInitiateRebuildMessage, + kQueryTeardownMessage, + kSaveQueryResultMessage, + kQueryExecutionSuccessMessage, + kPoisonMessage}; + + for (const auto message_type : sender_message_types) { + bus_->RegisterClientAsSender(foreman_client_id_, message_type); + } + + const std::vector<QueryExecutionMessageType> receiver_message_types{ + kShiftbossRegistrationMessage, + kAdmitRequestMessage, + kQueryInitiateResponseMessage, + kCatalogRelationNewBlockMessage, + kDataPipelineMessage, + kInitiateRebuildResponseMessage, + kWorkOrderCompleteMessage, + kRebuildWorkOrderCompleteMessage, + kWorkOrderFeedbackMessage, + kSaveQueryResultResponseMessage, + kPoisonMessage}; + + for (const auto message_type : receiver_message_types) { + bus_->RegisterClientAsReceiver(foreman_client_id_, message_type); + } + + policy_enforcer_.reset(new PolicyEnforcerDistributed( + foreman_client_id_, + catalog_database_, + &shiftboss_directory_, + bus_, + profile_individual_workorders)); +} + +void ForemanDistributed::run() { + if (cpu_id_ >= 0) { + // We can pin the foreman thread to a CPU if specified. + ThreadUtil::BindToCPU(cpu_id_); + } + + // Ensure that at least one Shiftboss to register. + if (shiftboss_directory_.empty()) { + const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); + DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; + + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); + DCHECK_EQ(1u, shiftboss_directory_.size()); + } + + // Event loop + for (;;) { + // Receive() causes this thread to sleep until next message is received. + const AnnotatedMessage annotated_message = + bus_->Receive(foreman_client_id_, 0, true); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); + DLOG(INFO) << "ForemanDistributed received typed '" << message_type + << "' message from client " << annotated_message.sender; + switch (message_type) { + case kShiftbossRegistrationMessage: { + S::ShiftbossRegistrationMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processShiftbossRegistrationMessage(annotated_message.sender, proto.work_order_capacity()); + break; + } + case kAdmitRequestMessage: { + const AdmitRequestMessage *request_message = + static_cast<const AdmitRequestMessage*>(tagged_message.message()); + + const vector<QueryHandle *> &query_handles = request_message->getQueryHandles(); + DCHECK(!query_handles.empty()); + + bool all_queries_admitted = true; + if (query_handles.size() == 1u) { + all_queries_admitted = + policy_enforcer_->admitQuery(query_handles.front()); + } else { + all_queries_admitted = policy_enforcer_->admitQueries(query_handles); + } + if (!all_queries_admitted) { + LOG(WARNING) << "The scheduler could not admit all the queries"; + // TODO(harshad) - Inform the main thread about the failure. + } + break; + } + case kQueryInitiateResponseMessage: { + // TODO(zuyu): check the query id. + break; + } + case kCatalogRelationNewBlockMessage: // Fall through + case kDataPipelineMessage: + case kRebuildWorkOrderCompleteMessage: + case kWorkOrderCompleteMessage: + case kWorkOrderFeedbackMessage: { + policy_enforcer_->processMessage(tagged_message); + break; + } + case kInitiateRebuildResponseMessage: { + // A unique case in the distributed version. + policy_enforcer_->processInitiateRebuildResponseMessage(tagged_message); + break; + } + case kSaveQueryResultResponseMessage: { + S::SaveQueryResultResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processSaveQueryResultResponseMessage(proto.cli_id(), proto.relation_id()); + break; + } + case kPoisonMessage: { + if (policy_enforcer_->hasQueries()) { + LOG(WARNING) << "ForemanDistributed thread exiting while some queries are " + "under execution or waiting to be admitted"; + } + + // Shutdown all Shiftbosses. + tmb::Address shiftboss_addresses; + for (std::size_t i = 0; i < shiftboss_directory_.size(); ++i) { + shiftboss_addresses.AddRecipient(shiftboss_directory_.getClientId(i)); + } + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + TaggedMessage poison_message(kPoisonMessage); + + const MessageBus::SendStatus send_status = + bus_->Send(foreman_client_id_, + shiftboss_addresses, + broadcast_style, + move(poison_message)); + DCHECK(send_status == MessageBus::SendStatus::kOK); + return; + } + default: + LOG(FATAL) << "Unknown message type to ForemanDistributed"; + } + + if (canCollectNewMessages(message_type)) { + vector<unique_ptr<S::WorkOrderMessage>> new_messages; + policy_enforcer_->getWorkOrderProtoMessages(&new_messages); + dispatchWorkOrderMessages(new_messages); + } + } +} + +bool ForemanDistributed::canCollectNewMessages(const tmb::message_type_id message_type) { + return !QUICKSTEP_EQUALS_ANY_CONSTANT(message_type, + kCatalogRelationNewBlockMessage, + kWorkOrderFeedbackMessage) && + // TODO(zuyu): Multiple Shiftbosses support. + !shiftboss_directory_.hasReachedCapacity(0); +} + +void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::WorkOrderMessage>> &messages) { + for (const auto &message : messages) { + DCHECK(message != nullptr); + // TODO(zuyu): Multiple Shiftbosses support. + sendWorkOrderMessage(0, *message); + shiftboss_directory_.incrementNumQueuedWorkOrders(0); + } +} + +void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, + const S::WorkOrderMessage &proto) { + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kWorkOrderMessage); + free(proto_bytes); + + const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index); + DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage + << "') to Shiftboss with TMB client ID " << shiftboss_client_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_client_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +void ForemanDistributed::printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const { + const std::vector<WorkOrderTimeEntry> &recorded_times = + policy_enforcer_->getProfilingResults(query_id); + fputs("Query ID,Worker ID,Operator ID,Time (microseconds)\n", out); + for (const auto &workorder_entry : recorded_times) { + const std::size_t worker_id = workorder_entry.worker_id; + fprintf(out, + "%lu,%lu,%lu,%lu\n", + query_id, + worker_id, + workorder_entry.operator_id, // Operator ID. + workorder_entry.end_time - workorder_entry.start_time); // Time. + } +} + +void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shiftboss_client_id, + const std::size_t work_order_capacity) { + S::ShiftbossRegistrationResponseMessage proto; + proto.set_shiftboss_index(shiftboss_directory_.size()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kShiftbossRegistrationResponseMessage); + free(proto_bytes); + + shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); + + DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '" + << kShiftbossRegistrationResponseMessage + << "') to Shiftboss with TMB client id " << shiftboss_client_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + shiftboss_client_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +void ForemanDistributed::processSaveQueryResultResponseMessage(const client_id cli_id, + const relation_id result_relation_id) { + S::QueryExecutionSuccessMessage proto; + proto.mutable_result_relation()->MergeFrom( + static_cast<CatalogDatabase*>(catalog_database_)->getRelationById(result_relation_id)->getProto()); + + const size_t proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kQueryExecutionSuccessMessage); + free(proto_bytes); + + // Notify the CLI regarding the query result. + DLOG(INFO) << "ForemanDistributed sent QueryExecutionSuccessMessage (typed '" + << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_, + foreman_client_id_, + cli_id, + move(message)); + CHECK(send_status == MessageBus::SendStatus::kOK); +} + +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/203d3ea6/query_execution/ForemanDistributed.hpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp new file mode 100644 index 0000000..f9a326a --- /dev/null +++ b/query_execution/ForemanDistributed.hpp @@ -0,0 +1,130 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_ + +#include <cstddef> +#include <cstdio> +#include <memory> +#include <vector> + +#include "catalog/CatalogTypedefs.hpp" +#include "query_execution/ForemanBase.hpp" +#include "query_execution/PolicyEnforcerDistributed.hpp" +#include "query_execution/ShiftbossDirectory.hpp" +#include "utility/Macros.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { + +class CatalogDatabaseLite; + +namespace serialization { class WorkOrderMessage; } + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief The Foreman receives queries from the main thread, messages from the + * policy enforcer and dispatches the work to Shiftbosses. It also + * receives work completion messages from Shiftbosses. + **/ +class ForemanDistributed final : public ForemanBase { + public: + /** + * @brief Constructor. + * + * @param bus A pointer to the TMB. + * @param catalog_database The catalog database where this query is executed. + * @param cpu_id The ID of the CPU to which the Foreman thread can be pinned. + * @param profile_individual_workorders Whether every workorder's execution + * be profiled or not. + * + * @note If cpu_id is not specified, Foreman thread can be possibly moved + * around on different CPUs by the OS. + **/ + ForemanDistributed(tmb::MessageBus *bus, + CatalogDatabaseLite *catalog_database, + const int cpu_id = -1, + const bool profile_individual_workorders = false); + + ~ForemanDistributed() override {} + + /** + * @brief Print the results of profiling individual work orders for a given + * query. + * + * TODO(harshad) - Add the name of the operator to the output. + * + * @param query_id The ID of the query for which the results are to be printed. + * @param out The file stream. + **/ + void printWorkOrderProfilingResults(const std::size_t query_id, + std::FILE *out) const; + + protected: + void run() override; + + private: + /** + * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the + * worker threads. + * + * @param messages The messages to be dispatched. + **/ + void dispatchWorkOrderMessages( + const std::vector<std::unique_ptr<serialization::WorkOrderMessage>> &messages); + + /** + * @brief Send the given message to the specified worker. + * + * @param worker_index The logical index of the recipient worker in + * ShiftbossDirectory. + * @param proto The WorkOrderMessage to be sent. + **/ + void sendWorkOrderMessage(const std::size_t worker_index, + const serialization::WorkOrderMessage &proto); + + void processShiftbossRegistrationMessage(const tmb::client_id shiftboss_client_id, + const std::size_t work_order_capacity); + + void processSaveQueryResultResponseMessage(const tmb::client_id cli_id, + const relation_id result_relation_id); + + /** + * @brief Check if we can collect new messages from the PolicyEnforcer. + * + * @param message_type The type of the last received message. + **/ + bool canCollectNewMessages(const tmb::message_type_id message_type); + + ShiftbossDirectory shiftboss_directory_; + + CatalogDatabaseLite *catalog_database_; + + std::unique_ptr<PolicyEnforcerDistributed> policy_enforcer_; + + DISALLOW_COPY_AND_ASSIGN(ForemanDistributed); +}; + +/** @} */ + +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_FOREMAN_DISTRIBUTED_HPP_