Logged all sent messages using glog.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d9135a8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d9135a8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d9135a8a Branch: refs/heads/quickstep-28-29 Commit: d9135a8a2d11a1eabf6705c88391c498f4be38bb Parents: 6168996 Author: Zuyu Zhang <zu...@twitter.com> Authored: Mon Aug 8 22:49:59 2016 -0700 Committer: Zuyu Zhang <zu...@twitter.com> Committed: Tue Aug 9 19:48:01 2016 -0700 ---------------------------------------------------------------------- query_execution/ForemanSingleNode.cpp | 14 ++-- query_execution/PolicyEnforcerDistributed.cpp | 38 ++++------ query_execution/QueryExecutionUtil.hpp | 6 +- query_execution/Shiftboss.cpp | 86 +++++++++------------- query_execution/Worker.cpp | 6 +- relational_operators/DeleteOperator.cpp | 10 +-- relational_operators/RebuildWorkOrder.hpp | 7 +- relational_operators/UpdateOperator.cpp | 10 +-- relational_operators/WorkOrder.hpp | 7 +- storage/InsertDestination.cpp | 18 +++-- storage/InsertDestination.hpp | 10 +-- 11 files changed, 92 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp index d064a6f..7596b00 100644 --- a/query_execution/ForemanSingleNode.cpp +++ b/query_execution/ForemanSingleNode.cpp @@ -168,16 +168,15 @@ void ForemanSingleNode::run() { // Signal the main thread that there are no queries to be executed. // Currently the message doesn't have any real content. TaggedMessage completion_tagged_message(kWorkloadCompletionMessage); + DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage + << "') to CLI with TMB client ID " << main_thread_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, foreman_client_id_, main_thread_client_id_, move(completion_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " - << foreman_client_id_ << " to main thread with TMB client ID" - << main_thread_client_id_; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } } } @@ -225,15 +224,14 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index, } TaggedMessage worker_tagged_message(&message, sizeof(message), type); + DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type + << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index); const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, worker_directory_->getClientID(worker_thread_index), move(worker_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << - "Message could not be sent from Foreman with TMB client ID " - << foreman_client_id_ << " to Foreman with TMB client ID " - << worker_directory_->getClientID(worker_thread_index); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } const std::vector<WorkOrderTimeEntry>& ForemanSingleNode http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index 6d0de47..c76a9e1 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -170,25 +170,22 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand kQueryInitiateMessage); free(proto_bytes); - LOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage - << "') to Shiftboss 0"; - // TODO(zuyu): Multiple Shiftbosses support. + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage + << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, shiftboss_directory_->getClientId(0), move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ - << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); // Wait Shiftboss for QueryInitiateResponseMessage. const tmb::AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_message.tagged_message; DCHECK_EQ(kQueryInitiateResponseMessage, tagged_message.message_type()); - LOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() - << "' message from client " << annotated_message.sender; + DLOG(INFO) << "PolicyEnforcerDistributed received typed '" << tagged_message.message_type() + << "' message from client " << annotated_message.sender; S::QueryInitiateResponseMessage proto_response; CHECK(proto_response.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -215,30 +212,27 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage kQueryTeardownMessage); // TODO(zuyu): Support multiple shiftbosses. - LOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage - << "') to Shiftboss 0"; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage + << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, shiftboss_directory_->getClientId(0), move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ - << " to Shiftboss"; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); TaggedMessage cli_message(kQueryExecutionSuccessMessage); // Notify the CLI query execution successfully. - LOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" << kQueryExecutionSuccessMessage - << "') to CLI with TMB client id " << cli_id; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" + << kQueryExecutionSuccessMessage + << "') to CLI with TMB client id " << cli_id; send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(cli_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ - << " to CLI with TMB client ID " << cli_id; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); return; } @@ -263,17 +257,15 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage kSaveQueryResultMessage); free(proto_bytes); - LOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage - << "') to Shiftboss 0"; // TODO(zuyu): Support multiple shiftbosses. + DLOG(INFO) << "PolicyEnforcerDistributed sent SaveQueryResultMessage (typed '" << kSaveQueryResultMessage + << "') to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, shiftboss_directory_->getClientId(0), move(message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from Foreman with TMB client ID " << foreman_client_id_ - << " to Shiftboss with TMB client ID " << shiftboss_directory_->getClientId(0); + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index 906fb6b..feb4cc0 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -128,11 +128,11 @@ class QueryExecutionUtil { address.All(true); TaggedMessage poison_tagged_message(kPoisonMessage); + DLOG(INFO) << "TMB client ID " << sender_id + << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all"; const tmb::MessageBus::SendStatus send_status = bus->Send( sender_id, address, style, std::move(poison_tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << - "Broadcast poison message from sender with TMB client ID " << sender_id - << " failed"; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } private: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index bd83dd4..ddfd47f 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -73,9 +73,9 @@ void Shiftboss::run() { // Receive() is a blocking call, causing this thread to sleep until next // message is received. AnnotatedMessage annotated_message(bus_->Receive(shiftboss_client_id_, 0, true)); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') received the typed '" << annotated_message.tagged_message.message_type() - << "' message from client " << annotated_message.sender; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') received the typed '" << annotated_message.tagged_message.message_type() + << "' message from client " << annotated_message.sender; switch (annotated_message.tagged_message.message_type()) { case kShiftbossRegistrationResponseMessage: { foreman_client_id_ = annotated_message.sender; @@ -121,18 +121,16 @@ void Shiftboss::run() { kWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage - << "') from Foreman to worker " << worker_index; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage + << "') from Foreman to worker " << worker_index; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, workers_->getClientID(worker_index), move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Worker with TMB client ID " << workers_->getClientID(worker_index); + CHECK(send_status == MessageBus::SendStatus::kOK); break; } case kInitiateRebuildMessage: { @@ -153,9 +151,10 @@ void Shiftboss::run() { case kRebuildWorkOrderCompleteMessage: case kDataPipelineMessage: case kWorkOrderFeedbackMessage: { - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') forwarded typed '" << annotated_message.tagged_message.message_type() - << "' message from worker (client " << annotated_message.sender << ") to Foreman"; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded typed '" << annotated_message.tagged_message.message_type() + << "' message from Worker with TMB client ID '" << annotated_message.sender + << "' to Foreman with TMB client ID " << foreman_client_id_; DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); const MessageBus::SendStatus send_status = @@ -163,9 +162,7 @@ void Shiftboss::run() { shiftboss_client_id_, foreman_client_id_, move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Foreman with TMB client ID " << foreman_client_id_; + CHECK(send_status == MessageBus::SendStatus::kOK); break; } case kSaveQueryResultMessage: { @@ -190,23 +187,21 @@ void Shiftboss::run() { kSaveQueryResultResponseMessage); free(proto_response_bytes); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage - << "') to Foreman"; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent SaveQueryResultResponseMessage (typed '" << kSaveQueryResultResponseMessage + << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Foreman with TMB client ID " << foreman_client_id_; + CHECK(send_status == MessageBus::SendStatus::kOK); break; } case kPoisonMessage: { - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') forwarded PoisonMessage (typed '" << kPoisonMessage - << "') from Foreman to all workers"; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') forwarded PoisonMessage (typed '" << kPoisonMessage + << "') from Foreman to all workers"; tmb::MessageStyle broadcast_style; broadcast_style.Broadcast(true); @@ -216,9 +211,7 @@ void Shiftboss::run() { worker_addresses_, broadcast_style, move(annotated_message.tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be broadcast from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to All workers"; + CHECK(send_status == MessageBus::SendStatus::kOK); return; } default: { @@ -245,10 +238,6 @@ size_t Shiftboss::getSchedulableWorker() { } void Shiftboss::registerWithForeman() { - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage - << "') to all"; - tmb::Address all_addresses; all_addresses.All(true); @@ -266,6 +255,9 @@ void Shiftboss::registerWithForeman() { kShiftbossRegistrationMessage); free(proto_bytes); + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage + << "') to all"; tmb::MessageBus::SendStatus send_status = bus_->Send(shiftboss_client_id_, all_addresses, style, move(message)); DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); @@ -285,10 +277,6 @@ void Shiftboss::processQueryInitiateMessage( bus_)); query_contexts_.emplace(query_id, move(query_context)); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage - << "') to Foreman"; - serialization::QueryInitiateResponseMessage proto; proto.set_query_id(query_id); @@ -301,14 +289,15 @@ void Shiftboss::processQueryInitiateMessage( kQueryInitiateResponseMessage); free(proto_bytes); + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage + << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Foreman with TMB client ID " << foreman_client_id_; + CHECK(send_status == MessageBus::SendStatus::kOK); } void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, @@ -324,10 +313,6 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, vector<MutableBlockReference> partially_filled_block_refs; insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage - << "') to Foreman"; - serialization::InitiateRebuildResponseMessage proto; proto.set_query_id(query_id); proto.set_operator_index(op_index); @@ -343,14 +328,15 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kInitiateRebuildResponseMessage); free(proto_bytes); + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage + << "') to Foreman with TMB client ID " << foreman_client_id_; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, foreman_client_id_, move(message_response)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Foreman with TMB client ID " << foreman_client_id_; + CHECK(send_status == MessageBus::SendStatus::kOK); for (size_t i = 0; i < partially_filled_block_refs.size(); ++i) { // NOTE(zuyu): Worker releases the memory after the execution of @@ -371,18 +357,16 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kRebuildWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - LOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ - << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage - << "') to worker " << worker_index; + DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_ + << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage + << "') to worker " << worker_index; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, shiftboss_client_id_, workers_->getClientID(worker_index), move(worker_tagged_message)); - CHECK(send_status == MessageBus::SendStatus::kOK) - << "Message could not be sent from Shiftboss with TMB client ID " << shiftboss_client_id_ - << " to Worker with TMB client ID " << workers_->getClientID(worker_index); + CHECK(send_status == MessageBus::SendStatus::kOK); } } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/query_execution/Worker.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp index d497be6..0b1efba 100644 --- a/query_execution/Worker.cpp +++ b/query_execution/Worker.cpp @@ -101,12 +101,12 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver, static_cast<const void *>(proto_bytes), proto_length, message_type); std::free(proto_bytes); + DLOG(INFO) << "Worker sent WorkOrderCompleteMessage (typed '" << message_type + << "') to Scheduler with TMB client ID " << receiver; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, worker_client_id_, receiver, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "Message could not be sent from worker with TMB client ID " - << worker_client_id_ << " to Foreman with TMB client ID " << receiver; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } template <typename CompletionMessageProtoT> http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/DeleteOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp index 8197aef..24da9bf 100644 --- a/relational_operators/DeleteOperator.cpp +++ b/relational_operators/DeleteOperator.cpp @@ -146,17 +146,15 @@ void DeleteWorkOrder::execute() { kDataPipelineMessage); std::free(proto_bytes); - const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue(); + DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, - worker_thread_client_id, + ClientIDMap::Instance()->getValue(), scheduler_client_id_, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not" - " be sent from thread with TMB client ID " << - worker_thread_client_id << " to Foreman with TMB client ID " - << scheduler_client_id_; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/RebuildWorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp index fe4be68..2cef1f1 100644 --- a/relational_operators/RebuildWorkOrder.hpp +++ b/relational_operators/RebuildWorkOrder.hpp @@ -101,15 +101,14 @@ class RebuildWorkOrder : public WorkOrder { // Refer to InsertDestination::sendBlockFilledMessage for the rationale // behind using the ClientIDMap map. + DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could " - " not be sent from thread with TMB client ID " << - ClientIDMap::Instance()->getValue() << " to Foreman with TMB client ID " - << scheduler_client_id_; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } private: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/UpdateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp index bc29365..143c741 100644 --- a/relational_operators/UpdateOperator.cpp +++ b/relational_operators/UpdateOperator.cpp @@ -128,17 +128,15 @@ void UpdateWorkOrder::execute() { kDataPipelineMessage); std::free(proto_bytes); - const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue(); + DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, - worker_thread_client_id, + ClientIDMap::Instance()->getValue(), scheduler_client_id_, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not" - " be sent from thread with TMB client ID " << - worker_thread_client_id << " to Foreman with TMB client ID " - << scheduler_client_id_; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/relational_operators/WorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp index 3cbab94..c1b9b68 100644 --- a/relational_operators/WorkOrder.hpp +++ b/relational_operators/WorkOrder.hpp @@ -44,7 +44,6 @@ namespace quickstep { * @{ */ - /** * @brief A single unit of work in a query plan, produced by a * RelationalOperator. Where possible, WorkOrders should be of @@ -284,14 +283,14 @@ class WorkOrder { tmb::MessageStyle single_receiver_style; DCHECK(bus != nullptr); + DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage + << "') to Scheduler with TMB client ID " << receiver_id; const tmb::MessageBus::SendStatus send_status = bus->Send(sender_id, receiver_address, single_receiver_style, std::move(msg)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could" - " not be sent from thread with TMB client ID " << sender_id << " to" - " receiver thread with TMB client ID " << receiver_id; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } /** http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.cpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp index 9897aed..5e83453 100644 --- a/storage/InsertDestination.cpp +++ b/storage/InsertDestination.cpp @@ -282,13 +282,15 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() { kCatalogRelationNewBlockMessage); free(proto_bytes); + DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '" + << kCatalogRelationNewBlockMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), scheduler_client_id_, move(tagged_msg)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman."; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); return storage_manager_->getBlockMutable(new_id, relation_); } @@ -330,13 +332,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() { kCatalogRelationNewBlockMessage); free(proto_bytes); + DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '" + << kCatalogRelationNewBlockMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), scheduler_client_id_, move(tagged_msg)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman."; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); return storage_manager_->getBlockMutable(new_id, relation_); } @@ -445,13 +449,15 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition kCatalogRelationNewBlockMessage); free(proto_bytes); + DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '" + << kCatalogRelationNewBlockMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), scheduler_client_id_, move(tagged_msg)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) - << "CatalogRelationNewBlockMessage could not be sent from InsertDestination to Foreman."; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); return storage_manager_->getBlockMutable(new_id, relation_); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d9135a8a/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index 3dae9a0..408e76b 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -249,16 +249,14 @@ class InsertDestination : public InsertDestinationInterface { // option 3. DCHECK(bus_ != nullptr); - const tmb::client_id worker_thread_client_id = thread_id_map_.getValue(); + DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage + << "') to Scheduler with TMB client ID " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, - worker_thread_client_id, + thread_id_map_.getValue(), scheduler_client_id_, std::move(tagged_message)); - CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << - "Message could not be sent from thread with TMB client ID " - << worker_thread_client_id << " to Scheduler with TMB client" - " ID " << scheduler_client_id_; + CHECK(send_status == tmb::MessageBus::SendStatus::kOK); } inline const std::size_t getQueryID() const {