Used two TMB implementations in Shiftboss.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d8fc9461 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d8fc9461 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d8fc9461 Branch: refs/heads/two-level-tmb Commit: d8fc9461b985ebbdc6c8feee3f3ce874de410f05 Parents: c40c553 Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Feb 8 12:48:31 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Feb 9 15:42:36 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Executor.cpp | 7 +- cli/distributed/Executor.hpp | 4 + query_execution/Shiftboss.cpp | 334 ++++++++++--------- query_execution/Shiftboss.hpp | 79 +++-- .../DistributedExecutionGeneratorTestRunner.cpp | 8 +- .../DistributedExecutionGeneratorTestRunner.hpp | 1 + 6 files changed, 229 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.cpp b/cli/distributed/Executor.cpp index 1d03579..5cc7df0 100644 --- a/cli/distributed/Executor.cpp +++ b/cli/distributed/Executor.cpp @@ -35,6 +35,7 @@ #include "tmb/id_typedefs.h" #include "tmb/native_net_client_message_bus.h" +#include "tmb/pure_memory_message_bus.h" #include "glog/logging.h" @@ -47,6 +48,8 @@ using tmb::client_id; namespace quickstep { void Executor::init() { + bus_local_.Initialize(); + executor_client_id_ = bus_.Connect(); DLOG(INFO) << "Executor TMB Client ID: " << executor_client_id_; @@ -59,7 +62,7 @@ void Executor::init() { for (std::size_t worker_thread_index = 0; worker_thread_index < FLAGS_num_workers; ++worker_thread_index) { - workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_)); + workers_.push_back(make_unique<Worker>(worker_thread_index, &bus_local_)); worker_client_ids.push_back(workers_.back()->getBusClientID()); } @@ -76,7 +79,7 @@ void Executor::init() { data_exchanger_.start(); shiftboss_ = - make_unique<Shiftboss>(&bus_, storage_manager_.get(), worker_directory_.get()); + make_unique<Shiftboss>(&bus_, &bus_local_, storage_manager_.get(), worker_directory_.get()); shiftboss_->start(); for (const auto &worker : workers_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/cli/distributed/Executor.hpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Executor.hpp b/cli/distributed/Executor.hpp index 6ffa756..aafeeae 100644 --- a/cli/distributed/Executor.hpp +++ b/cli/distributed/Executor.hpp @@ -24,6 +24,7 @@ #include <vector> #include "cli/distributed/Role.hpp" +#include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/Shiftboss.hpp" #include "query_execution/Worker.hpp" #include "query_execution/WorkerDirectory.hpp" @@ -65,6 +66,9 @@ class Executor final : public Role { void run() override {} private: + // Used between Shiftboss and Workers. + MessageBusImpl bus_local_; + tmb::client_id executor_client_id_; std::vector<std::unique_ptr<Worker>> workers_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index 2ed42d0..5e6014d 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -75,156 +75,164 @@ void Shiftboss::run() { for (;;) { // Receive() is a blocking call, causing this thread to sleep until next // message is received. - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') received the typed '" << annotated_message.tagged_message.message_type() - << "' message from client " << annotated_message.sender; - switch (annotated_message.tagged_message.message_type()) { - case kQueryInitiateMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::QueryInitiateMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); - break; - } - case kWorkOrderMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::WorkOrderMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - const std::size_t query_id = proto.query_id(); - DCHECK_EQ(1u, query_contexts_.count(query_id)); - - WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(), - shiftboss_index_, - &database_cache_, - query_contexts_[query_id].get(), - storage_manager_, - shiftboss_client_id_, - bus_); - - unique_ptr<WorkerMessage> worker_message( - WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); - - TaggedMessage worker_tagged_message(worker_message.get(), - sizeof(*worker_message), - kWorkOrderMessage); - - const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage - << "') from Foreman to worker " << worker_index; - - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - workers_->getClientID(worker_index), - move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kInitiateRebuildMessage: { - // Construct rebuild work orders, and send back their number to - // 'ForemanDistributed'. - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::InitiateRebuildMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processInitiateRebuildMessage(proto.query_id(), - proto.operator_index(), - proto.insert_destination_index(), - proto.relation_id()); - break; - } - case kCatalogRelationNewBlockMessage: // Fall through. - case kDataPipelineMessage: - case kWorkOrderFeedbackMessage: - case kWorkOrderCompleteMessage: - case kRebuildWorkOrderCompleteMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded typed '" << annotated_message.tagged_message.message_type() - << "' message from Worker with TMB client ID '" << annotated_message.sender - << "' to Foreman with TMB client ID " << foreman_client_id_; - - DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kQueryTeardownMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; + AnnotatedMessage annotated_message; + if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') received the typed '" << annotated_message.tagged_message.message_type() + << "' message from Foreman " << annotated_message.sender; + switch (annotated_message.tagged_message.message_type()) { + case kQueryInitiateMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::QueryInitiateMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processQueryInitiateMessage(proto.query_id(), proto.catalog_database_cache(), proto.query_context()); + break; + } + case kWorkOrderMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::WorkOrderMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + const std::size_t query_id = proto.query_id(); + DCHECK_EQ(1u, query_contexts_.count(query_id)); + + WorkOrder *work_order = WorkOrderFactory::ReconstructFromProto(proto.work_order(), + shiftboss_index_, + &database_cache_, + query_contexts_[query_id].get(), + storage_manager_, + shiftboss_client_id_local_, + bus_local_); + + unique_ptr<WorkerMessage> worker_message( + WorkerMessage::WorkOrderMessage(work_order, proto.operator_index())); + + TaggedMessage worker_tagged_message(worker_message.get(), + sizeof(*worker_message), + kWorkOrderMessage); + + const size_t worker_index = getSchedulableWorker(); + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ + << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage + << "') from Foreman to worker " << worker_index; + + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_local_, + shiftboss_client_id_local_, + workers_->getClientID(worker_index), + move(worker_tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + case kInitiateRebuildMessage: { + // Construct rebuild work orders, and send back their number to + // 'ForemanDistributed'. + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::InitiateRebuildMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + processInitiateRebuildMessage(proto.query_id(), + proto.operator_index(), + proto.insert_destination_index(), + proto.relation_id()); + break; + } + case kQueryTeardownMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; - serialization::QueryTeardownMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + serialization::QueryTeardownMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - query_contexts_.erase(proto.query_id()); - break; + query_contexts_.erase(proto.query_id()); + break; + } + case kSaveQueryResultMessage: { + const TaggedMessage &tagged_message = annotated_message.tagged_message; + + serialization::SaveQueryResultMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + for (int i = 0; i < proto.blocks_size(); ++i) { + storage_manager_->saveBlockOrBlob(proto.blocks(i)); + } + + // Clean up query execution states, i.e., QueryContext. + query_contexts_.erase(proto.query_id()); + + serialization::SaveQueryResultResponseMessage proto_response; + proto_response.set_query_id(proto.query_id()); + proto_response.set_relation_id(proto.relation_id()); + proto_response.set_cli_id(proto.cli_id()); + proto_response.set_shiftboss_index(shiftboss_index_); + + const size_t proto_response_length = proto_response.ByteSize(); + char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); + CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); + + TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), + proto_response_length, + kSaveQueryResultResponseMessage); + free(proto_response_bytes); + + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage + << "') to Foreman with TMB client ID " << foreman_client_id_; + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, + foreman_client_id_, + move(message_response)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + case kPoisonMessage: { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') forwarded PoisonMessage (typed '" << kPoisonMessage + << "') from Foreman to all workers"; + + tmb::MessageStyle broadcast_style; + broadcast_style.Broadcast(true); + + const MessageBus::SendStatus send_status = + bus_local_->Send(shiftboss_client_id_local_, worker_addresses_, broadcast_style, + move(annotated_message.tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + return; + } + default: { + LOG(FATAL) << "Unknown TMB message type"; + } } - case kSaveQueryResultMessage: { - const TaggedMessage &tagged_message = annotated_message.tagged_message; - - serialization::SaveQueryResultMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + } - for (int i = 0; i < proto.blocks_size(); ++i) { - storage_manager_->saveBlockOrBlob(proto.blocks(i)); + while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) { + switch (annotated_message.tagged_message.message_type()) { + case kCatalogRelationNewBlockMessage: + case kDataPipelineMessage: + case kWorkOrderFeedbackMessage: + case kWorkOrderCompleteMessage: + case kRebuildWorkOrderCompleteMessage: { + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ + << "') forwarded typed '" << annotated_message.tagged_message.message_type() + << "' message from Worker with TMB client ID '" << annotated_message.sender + << "' to Foreman with TMB client ID " << foreman_client_id_; + + DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); + const MessageBus::SendStatus send_status = + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, + foreman_client_id_, + move(annotated_message.tagged_message)); + CHECK(send_status == MessageBus::SendStatus::kOK); + break; + } + default: { + LOG(FATAL) << "Unknown TMB message type"; } - - // Clean up query execution states, i.e., QueryContext. - query_contexts_.erase(proto.query_id()); - - serialization::SaveQueryResultResponseMessage proto_response; - proto_response.set_query_id(proto.query_id()); - proto_response.set_relation_id(proto.relation_id()); - proto_response.set_cli_id(proto.cli_id()); - proto_response.set_shiftboss_index(shiftboss_index_); - - const size_t proto_response_length = proto_response.ByteSize(); - char *proto_response_bytes = static_cast<char*>(malloc(proto_response_length)); - CHECK(proto_response.SerializeToArray(proto_response_bytes, proto_response_length)); - - TaggedMessage message_response(static_cast<const void*>(proto_response_bytes), - proto_response_length, - kSaveQueryResultResponseMessage); - free(proto_response_bytes); - - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; - const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, - foreman_client_id_, - move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK); - break; - } - case kPoisonMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ - << "') forwarded PoisonMessage (typed '" << kPoisonMessage - << "') from Foreman to all workers"; - - tmb::MessageStyle broadcast_style; - broadcast_style.Broadcast(true); - - const MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, - worker_addresses_, - broadcast_style, - move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK); - return; - } - default: { - LOG(FATAL) << "Unknown TMB message type"; } } } @@ -264,21 +272,21 @@ void Shiftboss::registerWithForeman() { kShiftbossRegistrationMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage << "') to all"; tmb::MessageBus::SendStatus send_status = - bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); + bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message)); DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); } void Shiftboss::processShiftbossRegistrationResponseMessage() { - AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); + AnnotatedMessage annotated_message(bus_global_->Receive(shiftboss_client_id_global_, 0, true)); const TaggedMessage &tagged_message = annotated_message.tagged_message; DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type()); foreman_client_id_ = annotated_message.sender; - DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_ << "') received the typed '" << kShiftbossRegistrationResponseMessage << "' message from ForemanDistributed with client " << foreman_client_id_; @@ -289,10 +297,10 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() { storage_manager_->sendBlockDomainToShiftbossIndexMessage(shiftboss_index_); // Forward this message to Workers regarding <shiftboss_index_>. - QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_, + QueryExecutionUtil::BroadcastMessage(shiftboss_client_id_local_, worker_addresses_, move(annotated_message.tagged_message), - bus_); + bus_local_); } void Shiftboss::processQueryInitiateMessage( @@ -302,7 +310,7 @@ void Shiftboss::processQueryInitiateMessage( database_cache_.update(catalog_database_cache_proto); auto query_context = std::make_unique<QueryContext>( - query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_, bus_); + query_context_proto, database_cache_, storage_manager_, shiftboss_client_id_local_, bus_local_); query_contexts_.emplace(query_id, move(query_context)); serialization::QueryInitiateResponseMessage proto; @@ -317,12 +325,12 @@ void Shiftboss::processQueryInitiateMessage( kQueryInitiateResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, foreman_client_id_, move(message_response)); CHECK(send_status == MessageBus::SendStatus::kOK); @@ -356,12 +364,12 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kInitiateRebuildResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_global_, + shiftboss_client_id_global_, foreman_client_id_, move(message_response)); CHECK(send_status == MessageBus::SendStatus::kOK); @@ -374,8 +382,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, move(partially_filled_block_refs[i]), op_index, rel_id, - shiftboss_client_id_, - bus_); + shiftboss_client_id_local_, + bus_local_); unique_ptr<WorkerMessage> worker_message( WorkerMessage::RebuildWorkOrderMessage(rebuild_work_order, op_index)); @@ -385,13 +393,13 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kRebuildWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_ + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage << "') to worker " << worker_index; const MessageBus::SendStatus send_status = - QueryExecutionUtil::SendTMBMessage(bus_, - shiftboss_client_id_, + QueryExecutionUtil::SendTMBMessage(bus_local_, + shiftboss_client_id_local_, workers_->getClientID(worker_index), move(worker_tagged_message)); CHECK(send_status == MessageBus::SendStatus::kOK); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_execution/Shiftboss.hpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.hpp b/query_execution/Shiftboss.hpp index 6538d48..4864988 100644 --- a/query_execution/Shiftboss.hpp +++ b/query_execution/Shiftboss.hpp @@ -61,7 +61,8 @@ class Shiftboss : public Thread { /** * @brief Constructor. * - * @param bus A pointer to the TMB. + * @param bus_global A pointer to the TMB for Foreman. + * @param bus_local A pointer to the TMB for Workers. * @param storage_manager The StorageManager to use. * @param workers A pointer to the WorkerDirectory. * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned. @@ -69,69 +70,75 @@ class Shiftboss : public Thread { * @note If cpu_id is not specified, Shiftboss thread can be possibly moved * around on different CPUs by the OS. **/ - Shiftboss(tmb::MessageBus *bus, + Shiftboss(tmb::MessageBus *bus_global, + tmb::MessageBus *bus_local, StorageManager *storage_manager, WorkerDirectory *workers, const int cpu_id = -1) - : bus_(DCHECK_NOTNULL(bus)), + : bus_global_(DCHECK_NOTNULL(bus_global)), + bus_local_(DCHECK_NOTNULL(bus_local)), storage_manager_(DCHECK_NOTNULL(storage_manager)), workers_(DCHECK_NOTNULL(workers)), cpu_id_(cpu_id), - shiftboss_client_id_(tmb::kClientIdNone), + shiftboss_client_id_global_(tmb::kClientIdNone), + shiftboss_client_id_local_(tmb::kClientIdNone), foreman_client_id_(tmb::kClientIdNone), max_msgs_per_worker_(1), start_worker_index_(0u) { // Check to have at least one Worker. DCHECK_GT(workers->getNumWorkers(), 0u); - shiftboss_client_id_ = bus_->Connect(); - LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_; - DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone); + shiftboss_client_id_global_ = bus_global_->Connect(); + LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_global_; + DCHECK_NE(shiftboss_client_id_global_, tmb::kClientIdNone); + + shiftboss_client_id_local_ = bus_local_->Connect(); + DCHECK_NE(shiftboss_client_id_local_, tmb::kClientIdNone); // Messages between Foreman and Shiftboss. - bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kShiftbossRegistrationMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kShiftbossRegistrationResponseMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryInitiateMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryInitiateResponseMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryInitiateMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kQueryInitiateResponseMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kInitiateRebuildMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kInitiateRebuildResponseMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kSaveQueryResultMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kSaveQueryResultResponseMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kSaveQueryResultMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kSaveQueryResultResponseMessage); // Message sent to Worker. - bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationResponseMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kShiftbossRegistrationResponseMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kRebuildWorkOrderMessage); // Forward the following message types from Foreman to Workers. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kWorkOrderMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kWorkOrderMessage); // Forward the following message types from Workers to Foreman. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kCatalogRelationNewBlockMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage); + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kCatalogRelationNewBlockMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kCatalogRelationNewBlockMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kDataPipelineMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage); + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kDataPipelineMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kDataPipelineMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderFeedbackMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage); + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderFeedbackMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderFeedbackMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage); + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kWorkOrderCompleteMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kWorkOrderCompleteMessage); - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); - bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage); + bus_local_->RegisterClientAsReceiver(shiftboss_client_id_local_, kRebuildWorkOrderCompleteMessage); + bus_global_->RegisterClientAsSender(shiftboss_client_id_global_, kRebuildWorkOrderCompleteMessage); // Clean up query execution states, i.e., QueryContext. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryTeardownMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kQueryTeardownMessage); // Stop itself. - bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage); + bus_global_->RegisterClientAsReceiver(shiftboss_client_id_global_, kPoisonMessage); // Stop all workers. - bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage); + bus_local_->RegisterClientAsSender(shiftboss_client_id_local_, kPoisonMessage); for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) { worker_addresses_.AddRecipient(workers_->getClientID(i)); @@ -149,7 +156,7 @@ class Shiftboss : public Thread { * @return TMB client ID of shiftboss thread. **/ inline tmb::client_id getBusClientID() const { - return shiftboss_client_id_; + return shiftboss_client_id_global_; } /** @@ -220,9 +227,7 @@ class Shiftboss : public Thread { const QueryContext::insert_destination_id dest_index, const relation_id rel_id); - // TODO(zuyu): Use two buses for the message communication between Foreman and Shiftboss, - // and Shiftboss and Worker thread pool. - tmb::MessageBus *bus_; + tmb::MessageBus *bus_global_, *bus_local_; CatalogDatabaseCache database_cache_; StorageManager *storage_manager_; @@ -231,7 +236,7 @@ class Shiftboss : public Thread { // The ID of the CPU that the Shiftboss thread can optionally be pinned to. const int cpu_id_; - tmb::client_id shiftboss_client_id_, foreman_client_id_; + tmb::client_id shiftboss_client_id_global_, shiftboss_client_id_local_, foreman_client_id_; // Unique per Shiftboss instance. std::uint64_t shiftboss_index_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 2e18467..71965e6 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -76,6 +76,7 @@ const char *DistributedExecutionGeneratorTestRunner::kResetOption = DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner(const string &storage_path) : query_id_(0), + bus_locals_(kNumInstances), data_exchangers_(kNumInstances) { bus_.Initialize(); @@ -113,7 +114,10 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner kAnyNUMANodeID); for (int i = 0; i < kNumInstances; ++i) { - workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, &bus_)); + tmb::MessageBus *bus_local = &bus_locals_[i]; + bus_local->Initialize(); + + workers_.push_back(make_unique<Worker>(0 /* worker_thread_index */, bus_local)); const vector<tmb::client_id> worker_client_ids(1, workers_.back()->getBusClientID()); worker_directories_.push_back( @@ -128,7 +132,7 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( - make_unique<Shiftboss>(&bus_, storage_manager.get(), worker_directories_.back().get())); + make_unique<Shiftboss>(&bus_, bus_local, storage_manager.get(), worker_directories_.back().get())); storage_managers_.push_back(move(storage_manager)); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d8fc9461/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index 63e320d..2cd2427 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -129,6 +129,7 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { std::unique_ptr<ForemanDistributed> foreman_; + std::vector<MessageBusImpl> bus_locals_; std::vector<std::unique_ptr<Worker>> workers_; std::vector<std::unique_ptr<WorkerDirectory>> worker_directories_; std::vector<DataExchangerAsync> data_exchangers_;