Repository: incubator-quickstep Updated Branches: refs/heads/master 8643add3f -> 686bbb587
Refactored the debug logs in the query execution. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/686bbb58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/686bbb58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/686bbb58 Branch: refs/heads/master Commit: 686bbb587035b4980503373461c7df4466115023 Parents: 8643add Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Mar 5 01:54:45 2017 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Mar 5 02:03:57 2017 -0800 ---------------------------------------------------------------------- cli/distributed/Cli.cpp | 22 ++++---- cli/distributed/Conductor.cpp | 21 ++++---- cli/distributed/QuickstepDistributedCli.cpp | 2 + .../DistributedCommandExecutorTestRunner.cpp | 11 +--- query_execution/BlockLocator.cpp | 24 ++++----- query_execution/BlockLocatorUtil.cpp | 11 ++-- query_execution/ForemanDistributed.cpp | 15 +++--- query_execution/ForemanSingleNode.cpp | 7 ++- query_execution/PolicyEnforcerDistributed.cpp | 17 ++---- query_execution/QueryExecutionUtil.hpp | 48 ++++++++++++++++- query_execution/QueryManagerDistributed.cpp | 3 +- query_execution/Shiftboss.cpp | 56 +++++++++----------- query_execution/Worker.cpp | 7 +-- query_execution/tests/BlockLocator_unittest.cpp | 9 ++-- relational_operators/DeleteOperator.cpp | 3 +- relational_operators/RebuildWorkOrder.hpp | 3 +- relational_operators/UpdateOperator.cpp | 3 +- relational_operators/WorkOrder.hpp | 3 +- storage/InsertDestination.cpp | 15 +++--- storage/InsertDestination.hpp | 3 +- storage/StorageManager.cpp | 21 ++++---- storage/tests/DataExchange_unittest.cpp | 5 +- 22 files changed, 151 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/Cli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Cli.cpp b/cli/distributed/Cli.cpp index 14880a7..9f48ecc 100644 --- a/cli/distributed/Cli.cpp +++ b/cli/distributed/Cli.cpp @@ -88,9 +88,7 @@ void Cli::init() { bus_.RegisterClientAsSender(cli_id_, kDistributedCliRegistrationMessage); bus_.RegisterClientAsReceiver(cli_id_, kDistributedCliRegistrationResponseMessage); - DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage (typed '" - << kDistributedCliRegistrationMessage - << "') to all"; + DLOG(INFO) << "DistributedCli sent DistributedCliRegistrationMessage to all"; tmb::Address all_addresses; all_addresses.All(true); @@ -103,12 +101,12 @@ void Cli::init() { // Wait for Conductor to response. const AnnotatedMessage cli_reg_response_message(bus_.Receive(cli_id_, 0, true)); - CHECK_EQ(kDistributedCliRegistrationResponseMessage, - cli_reg_response_message.tagged_message.message_type()); + DCHECK_EQ(kDistributedCliRegistrationResponseMessage, + cli_reg_response_message.tagged_message.message_type()); conductor_client_id_ = cli_reg_response_message.sender; - DLOG(INFO) << "DistributedCli received typed '" << kDistributedCliRegistrationResponseMessage - << "' message from Conductor (id'" << conductor_client_id_ << "')."; + DLOG(INFO) << "DistributedCli received DistributedCliRegistrationResponseMessage from Conductor with Client " + << conductor_client_id_; // Setup StorageManager. bus_.RegisterClientAsSender(cli_id_, kBlockDomainRegistrationMessage); @@ -179,8 +177,7 @@ void Cli::run() { } } - DLOG(INFO) << "DistributedCli sent SqlQueryMessage (typed '" << kSqlQueryMessage - << "') to Conductor"; + DLOG(INFO) << "DistributedCli sent SqlQueryMessage to Conductor"; S::SqlQueryMessage proto; proto.set_sql_query(*command_string); @@ -197,9 +194,10 @@ void Cli::run() { const AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true)); const TaggedMessage &tagged_message = annotated_message.tagged_message; - DLOG(INFO) << "DistributedCli received typed '" << tagged_message.message_type() - << "' message from client " << annotated_message.sender; - switch (tagged_message.message_type()) { + const tmb::message_type_id message_type = tagged_message.message_type(); + DLOG(INFO) << "DistributedCli received " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Client " << annotated_message.sender; + switch (message_type) { case kCommandResponseMessage: { S::CommandResponseMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/Conductor.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/Conductor.cpp b/cli/distributed/Conductor.cpp index 5fb4453..ef253f1 100644 --- a/cli/distributed/Conductor.cpp +++ b/cli/distributed/Conductor.cpp @@ -124,17 +124,16 @@ void Conductor::run() { for (;;) { AnnotatedMessage annotated_message(bus_.Receive(conductor_client_id_, 0, true)); const TaggedMessage &tagged_message = annotated_message.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); const client_id sender = annotated_message.sender; - - DLOG(INFO) << "Conductor received typed '" << tagged_message.message_type() - << "' message from client " << sender; - switch (tagged_message.message_type()) { + DLOG(INFO) << "Conductor received " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Client " << sender; + switch (message_type) { case kDistributedCliRegistrationMessage: { TaggedMessage message(kDistributedCliRegistrationResponseMessage); - DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage (typed '" - << kDistributedCliRegistrationResponseMessage - << "') to Distributed CLI " << sender; + DLOG(INFO) << "Conductor sent DistributedCliRegistrationResponseMessage to DistributedCLI with Client " + << sender; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); break; @@ -201,8 +200,7 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); free(proto_bytes); - DLOG(INFO) << "Conductor sent CommandResponseMessage (typed '" << kCommandResponseMessage - << "') to Distributed CLI " << sender; + DLOG(INFO) << "Conductor sent CommandResponseMessage to DistributedCLI with Client " << sender; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); } else { @@ -232,9 +230,8 @@ void Conductor::processSqlQueryMessage(const tmb::client_id sender, string *comm kQueryExecutionErrorMessage); free(proto_bytes); - DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage (typed '" - << kQueryExecutionErrorMessage - << "') to Distributed CLI " << sender; + DLOG(INFO) << "Conductor (on behalf of Optimizer) sent QueryExecutionErrorMessage to DistributedCLI with Client " + << sender; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, conductor_client_id_, sender, move(message))); } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/distributed/QuickstepDistributedCli.cpp ---------------------------------------------------------------------- diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp index f01cd13..b7edd83 100644 --- a/cli/distributed/QuickstepDistributedCli.cpp +++ b/cli/distributed/QuickstepDistributedCli.cpp @@ -75,6 +75,8 @@ int main(int argc, char *argv[]) { } role->init(); + LOG(INFO) << FLAGS_role << " is ready"; + role->run(); return 0; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/cli/tests/DistributedCommandExecutorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/cli/tests/DistributedCommandExecutorTestRunner.cpp b/cli/tests/DistributedCommandExecutorTestRunner.cpp index 66d0767..0df488e 100644 --- a/cli/tests/DistributedCommandExecutorTestRunner.cpp +++ b/cli/tests/DistributedCommandExecutorTestRunner.cpp @@ -20,7 +20,6 @@ #include "cli/tests/DistributedCommandExecutorTestRunner.hpp" #include <cstdio> -#include <functional> #include <memory> #include <set> #include <string> @@ -63,12 +62,6 @@ namespace quickstep { class CatalogRelation; -namespace { - -void nop() {} - -} // namespace - namespace C = cli; const char *DistributedCommandExecutorTestRunner::kResetOption = @@ -104,8 +97,8 @@ DistributedCommandExecutorTestRunner::DistributedCommandExecutorTestRunner(const // NOTE(zuyu): Foreman should initialize before Shiftboss so that the former // could receive a registration message from the latter. - foreman_ = make_unique<ForemanDistributed>(*block_locator_, std::bind(&nop), &bus_, - test_database_loader_->catalog_database()); + foreman_ = make_unique<ForemanDistributed>(*block_locator_, &bus_, test_database_loader_->catalog_database(), + nullptr /* query_processor */); // We don't use the NUMA aware version of worker code. const vector<numa_node_id> numa_nodes(1 /* Number of worker threads per instance */, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/BlockLocator.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp index fa6db51..765021e 100644 --- a/query_execution/BlockLocator.cpp +++ b/query_execution/BlockLocator.cpp @@ -55,10 +55,11 @@ void BlockLocator::run() { // message is received. const tmb::AnnotatedMessage annotated_message = bus_->Receive(locator_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_message.tagged_message; + const tmb::message_type_id message_type = tagged_message.message_type(); const client_id sender = annotated_message.sender; - DLOG(INFO) << "BlockLocator received the typed '" << tagged_message.message_type() - << "' message from TMB Client " << sender; - switch (tagged_message.message_type()) { + DLOG(INFO) << "BlockLocator received " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Client " << sender; + switch (message_type) { case kBlockDomainRegistrationMessage: { serialization::BlockDomainRegistrationMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -190,10 +191,8 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive kBlockDomainRegistrationResponseMessage); free(proto_bytes); - DLOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') to TMB Client (id '" << receiver << "')"; + DLOG(INFO) << "BlockLocator with Client " << locator_client_id_ + << " sent BlockDomainRegistrationResponseMessage to Client " << receiver; CHECK(tmb::MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, @@ -220,9 +219,8 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver, kLocateBlockResponseMessage); free(proto_bytes); - DLOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent LocateBlockResponseMessage (typed '" << kLocateBlockResponseMessage - << "') to StorageManager (id '" << receiver << "')"; + DLOG(INFO) << "BlockLocator with Client " << locator_client_id_ + << " sent LocateBlockResponseMessage to StorageManager with Client " << receiver; CHECK(tmb::MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, @@ -249,10 +247,8 @@ void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id r kGetPeerDomainNetworkAddressesResponseMessage); free(proto_bytes); - DLOG(INFO) << "BlockLocator (id '" << locator_client_id_ - << "') sent GetPeerDomainNetworkAddressesResponseMessage (typed '" - << kGetPeerDomainNetworkAddressesResponseMessage - << "') to StorageManager (id '" << receiver << "')"; + DLOG(INFO) << "BlockLocator with Client " << locator_client_id_ + << " sent GetPeerDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver; CHECK(tmb::MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/BlockLocatorUtil.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp index d2d1e96..63a4799 100644 --- a/query_execution/BlockLocatorUtil.cpp +++ b/query_execution/BlockLocatorUtil.cpp @@ -65,9 +65,7 @@ block_id_domain getBlockDomain(const std::string &network_address, kBlockDomainRegistrationMessage); std::free(proto_bytes); - DLOG(INFO) << "Client (id '" << cli_id - << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage - << "') to BlockLocator."; + DLOG(INFO) << "Client " << cli_id << " broadcasts BlockDomainRegistrationMessage to BlockLocator"; CHECK(MessageBus::SendStatus::kOK == bus->Send(cli_id, address, style, std::move(message))); @@ -78,10 +76,9 @@ block_id_domain getBlockDomain(const std::string &network_address, *locator_client_id = annotated_message.sender; - DLOG(INFO) << "Client (id '" << cli_id - << "') received BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') from BlockLocator (id '" << *locator_client_id << "')."; + DLOG(INFO) << "Client " << cli_id + << " received BlockDomainRegistrationResponseMessage from BlockLocator with Client " + << *locator_client_id; S::BlockDomainMessage response_proto; CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/ForemanDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp index b59edb5..3903e8a 100644 --- a/query_execution/ForemanDistributed.cpp +++ b/query_execution/ForemanDistributed.cpp @@ -135,8 +135,7 @@ void ForemanDistributed::run() { const AnnotatedMessage annotated_message = bus_->Receive(foreman_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_message.tagged_message; DCHECK_EQ(kShiftbossRegistrationMessage, tagged_message.message_type()); - DLOG(INFO) << "ForemanDistributed received typed '" << tagged_message.message_type() - << "' message from client " << annotated_message.sender; + DLOG(INFO) << "ForemanDistributed received ShiftbossRegistrationMessage from Client " << annotated_message.sender; S::ShiftbossRegistrationMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -152,8 +151,8 @@ void ForemanDistributed::run() { bus_->Receive(foreman_client_id_, 0, true); const TaggedMessage &tagged_message = annotated_message.tagged_message; const tmb::message_type_id message_type = tagged_message.message_type(); - DLOG(INFO) << "ForemanDistributed received typed '" << message_type - << "' message from client " << annotated_message.sender; + DLOG(INFO) << "ForemanDistributed received " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Client " << annotated_message.sender; switch (message_type) { case kShiftbossRegistrationMessage: { S::ShiftbossRegistrationMessage proto; @@ -397,8 +396,7 @@ void ForemanDistributed::sendWorkOrderMessage(const size_t shiftboss_index, free(proto_bytes); const client_id shiftboss_client_id = shiftboss_directory_.getClientId(shiftboss_index); - DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage (typed '" << kWorkOrderMessage - << "') to Shiftboss with TMB client ID " << shiftboss_client_id; + DLOG(INFO) << "ForemanDistributed sent WorkOrderMessage to Shiftboss with Client " << shiftboss_client_id; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, @@ -439,9 +437,8 @@ void ForemanDistributed::processShiftbossRegistrationMessage(const client_id shi shiftboss_directory_.addShiftboss(shiftboss_client_id, work_order_capacity); - DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage (typed '" - << kShiftbossRegistrationResponseMessage - << "') to Shiftboss with TMB client id " << shiftboss_client_id; + DLOG(INFO) << "ForemanDistributed sent ShiftbossRegistrationResponseMessage to Shiftboss with Client " + << shiftboss_client_id; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/ForemanSingleNode.cpp ---------------------------------------------------------------------- diff --git a/query_execution/ForemanSingleNode.cpp b/query_execution/ForemanSingleNode.cpp index 3eac0ff..1501408 100644 --- a/query_execution/ForemanSingleNode.cpp +++ b/query_execution/ForemanSingleNode.cpp @@ -166,8 +166,7 @@ void ForemanSingleNode::run() { // Signal the main thread that there are no queries to be executed. // Currently the message doesn't have any real content. TaggedMessage completion_tagged_message(kWorkloadCompletionMessage); - DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage (typed '" << kWorkloadCompletionMessage - << "') to CLI with TMB client ID " << main_thread_client_id_; + DLOG(INFO) << "ForemanSingleNode sent WorkloadCompletionMessage to CLI with Client " << main_thread_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, @@ -222,8 +221,8 @@ void ForemanSingleNode::sendWorkerMessage(const size_t worker_thread_index, } TaggedMessage worker_tagged_message(&message, sizeof(message), type); - DLOG(INFO) << "ForemanSingleNode sent WorkOrderMessage (typed '" << type - << "') to Worker with TMB client ID " << worker_directory_->getClientID(worker_thread_index); + DLOG(INFO) << "ForemanSingleNode sent " << QueryExecutionUtil::MessageTypeToString(type) + << " to Worker with Client " << worker_directory_->getClientID(worker_thread_index); const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/PolicyEnforcerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp index e9faf8c..619e73f 100644 --- a/query_execution/PolicyEnforcerDistributed.cpp +++ b/query_execution/PolicyEnforcerDistributed.cpp @@ -231,8 +231,7 @@ void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_hand shiftboss_addresses.AddRecipient(shiftboss_directory_->getClientId(i)); } - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage (typed '" << kQueryInitiateMessage - << "') to all Shiftbosses"; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryInitiateMessage to all Shiftbosses"; QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), @@ -287,9 +286,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage free(proto_bytes); // Notify the CLI regarding the query result. - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" - << kQueryExecutionSuccessMessage - << "') to CLI with TMB client id " << cli_id; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage to DistributedCLI with Client " << cli_id; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message)); CHECK(send_status == MessageBus::SendStatus::kOK); @@ -300,9 +297,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage } // Notify the CLI query execution successfully. - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage (typed '" - << kQueryExecutionSuccessMessage - << "') to CLI with TMB client id " << cli_id; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryExecutionSuccessMessage to DistributedCLI with Client " << cli_id; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, TaggedMessage(kQueryExecutionSuccessMessage)); @@ -320,8 +315,7 @@ void PolicyEnforcerDistributed::onQueryCompletion(QueryManagerBase *query_manage TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kQueryTeardownMessage); free(proto_bytes); - DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage (typed '" << kQueryTeardownMessage - << "') to all Shiftbosses"; + DLOG(INFO) << "PolicyEnforcerDistributed sent QueryTeardownMessage to all Shiftbosses"; QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses, move(message), bus_); } @@ -412,8 +406,7 @@ void PolicyEnforcerDistributed::processAnalyzeQueryResult(const tmb::client_id c TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, kCommandResponseMessage); free(proto_bytes); - DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage (typed '" << kCommandResponseMessage - << "') to CLI with TMB client id " << cli_id; + DLOG(INFO) << "PolicyEnforcerDistributed sent CommandResponseMessage to DistributedCLI with Client " << cli_id; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, foreman_client_id_, cli_id, move(message)); CHECK(send_status == MessageBus::SendStatus::kOK); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index b41965c..3f74af3 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -22,10 +22,12 @@ #include <cstddef> #include <memory> +#include <string> #include <utility> #include "query_execution/AdmitRequestMessage.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" +#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED. #include "utility/Macros.hpp" #include "glog/logging.h" @@ -49,6 +51,49 @@ class QueryHandle; **/ class QueryExecutionUtil { public: + static std::string MessageTypeToString(const tmb::message_type_id message_type) { + switch (message_type) { + case kAdmitRequestMessage: return "AdmitRequestMessage"; + case kWorkOrderMessage: return "WorkOrderMessage"; + case kWorkOrderCompleteMessage: return "WorkOrderCompleteMessage"; + case kCatalogRelationNewBlockMessage: return "CatalogRelationNewBlockMessage"; + case kDataPipelineMessage: return "DataPipelineMessage"; + case kWorkOrderFeedbackMessage: return "WorkOrderFeedbackMessage"; + case kRebuildWorkOrderMessage: return "RebuildWorkOrderMessage"; + case kRebuildWorkOrderCompleteMessage: return "RebuildWorkOrderCompleteMessage"; + case kWorkloadCompletionMessage: return "WorkloadCompletionMessage"; + case kPoisonMessage: return "PoisonMessage"; +#ifdef QUICKSTEP_DISTRIBUTED + case kShiftbossRegistrationMessage: return "ShiftbossRegistrationMessage"; + case kShiftbossRegistrationResponseMessage: return "ShiftbossRegistrationResponseMessage"; + case kDistributedCliRegistrationMessage: return "DistributedCliRegistrationMessage"; + case kDistributedCliRegistrationResponseMessage: return "DistributedCliRegistrationResponseMessage"; + case kSqlQueryMessage: return "SqlQueryMessage"; + case kQueryInitiateMessage: return "QueryInitiateMessage"; + case kQueryInitiateResponseMessage: return "QueryInitiateResponseMessage"; + case kInitiateRebuildMessage: return "InitiateRebuildMessage"; + case kInitiateRebuildResponseMessage: return "InitiateRebuildResponseMessage"; + case kQueryTeardownMessage: return "QueryTeardownMessage"; + case kQueryExecutionSuccessMessage: return "QueryExecutionSuccessMessage"; + case kCommandResponseMessage: return "CommandResponseMessage"; + case kQueryExecutionErrorMessage: return "QueryExecutionErrorMessage"; + case kQueryResultTeardownMessage: return "QueryResultTeardownMessage"; + case kBlockDomainRegistrationMessage: return "BlockDomainRegistrationMessage"; + case kBlockDomainRegistrationResponseMessage: return "BlockDomainRegistrationResponseMessage"; + case kBlockDomainToShiftbossIndexMessage: return "BlockDomainToShiftbossIndexMessage"; + case kAddBlockLocationMessage: return "AddBlockLocationMessage"; + case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage"; + case kLocateBlockMessage: return "LocateBlockMessage"; + case kLocateBlockResponseMessage: return "LocateBlockResponseMessage"; + case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage"; + case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage"; + case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage"; +#endif // QUICKSTEP_DISTRIBUTED + default: + LOG(FATAL) << "Unknown message type"; + } + } + /** * @brief Send a TMB message to a single receiver. * @@ -145,8 +190,7 @@ class QueryExecutionUtil { address.All(true); tmb::TaggedMessage poison_tagged_message(kPoisonMessage); - DLOG(INFO) << "TMB client ID " << sender_id - << " broadcast PoisonMessage (typed '" << kPoisonMessage << "') to all"; + DLOG(INFO) << "Client " << sender_id << " broadcasts PoisonMessage to all"; const tmb::MessageBus::SendStatus send_status = bus->Send( sender_id, address, style, std::move(poison_tagged_message)); CHECK(send_status == tmb::MessageBus::SendStatus::kOK); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/QueryManagerDistributed.cpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp index 6c6f895..92645b6 100644 --- a/query_execution/QueryManagerDistributed.cpp +++ b/query_execution/QueryManagerDistributed.cpp @@ -182,8 +182,7 @@ bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) { kInitiateRebuildMessage); free(proto_bytes); - DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage (typed '" << kInitiateRebuildMessage - << "') to all Shiftbosses"; + DLOG(INFO) << "ForemanDistributed sent InitiateRebuildMessage to all Shiftbosses"; QueryExecutionUtil::BroadcastMessage(foreman_client_id_, shiftboss_addresses_, move(tagged_msg), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/Shiftboss.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp index fa922f0..d023d84 100644 --- a/query_execution/Shiftboss.cpp +++ b/query_execution/Shiftboss.cpp @@ -154,13 +154,15 @@ void Shiftboss::run() { processShiftbossRegistrationResponseMessage(); + AnnotatedMessage annotated_message; + tmb::message_type_id message_type; for (;;) { - AnnotatedMessage annotated_message; if (bus_global_->ReceiveIfAvailable(shiftboss_client_id_global_, &annotated_message, 0, true)) { + message_type = annotated_message.tagged_message.message_type(); DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') received the typed '" << annotated_message.tagged_message.message_type() - << "' message from Foreman " << annotated_message.sender; - switch (annotated_message.tagged_message.message_type()) { + << "') received " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Foreman with Client " << annotated_message.sender; + switch (message_type) { case kQueryInitiateMessage: { const TaggedMessage &tagged_message = annotated_message.tagged_message; @@ -192,9 +194,8 @@ void Shiftboss::run() { kWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ - << "') forwarded WorkOrderMessage (typed '" << kWorkOrderMessage - << "') from Foreman to worker " << worker_index; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_ + << " forwarded WorkOrderMessage from Foreman to Worker " << worker_index; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_local_, @@ -228,9 +229,8 @@ void Shiftboss::run() { break; } case kPoisonMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') forwarded PoisonMessage (typed '" << kPoisonMessage - << "') from Foreman to all workers"; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_ + << " forwarded PoisonMessage from Foreman to all Workers"; tmb::MessageStyle broadcast_style; broadcast_style.Broadcast(true); @@ -248,16 +248,17 @@ void Shiftboss::run() { } while (bus_local_->ReceiveIfAvailable(shiftboss_client_id_local_, &annotated_message, 0, true)) { - switch (annotated_message.tagged_message.message_type()) { + message_type = annotated_message.tagged_message.message_type(); + switch (message_type) { case kCatalogRelationNewBlockMessage: case kDataPipelineMessage: case kWorkOrderFeedbackMessage: case kWorkOrderCompleteMessage: case kRebuildWorkOrderCompleteMessage: { - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') forwarded typed '" << annotated_message.tagged_message.message_type() - << "' message from Worker with TMB client ID '" << annotated_message.sender - << "' to Foreman with TMB client ID " << foreman_client_id_; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_ + << " forwarded " << QueryExecutionUtil::MessageTypeToString(message_type) + << " from Worker with Client " << annotated_message.sender + << " to Foreman with Client " << foreman_client_id_; DCHECK_NE(foreman_client_id_, tmb::kClientIdNone); const MessageBus::SendStatus send_status = @@ -310,9 +311,8 @@ void Shiftboss::registerWithForeman() { kShiftbossRegistrationMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') sent ShiftbossRegistrationMessage (typed '" << kShiftbossRegistrationMessage - << "') to all"; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_ + << " sent ShiftbossRegistrationMessage to all"; tmb::MessageBus::SendStatus send_status = bus_global_->Send(shiftboss_client_id_global_, all_addresses, style, move(message)); DCHECK(send_status == tmb::MessageBus::SendStatus::kOK); @@ -324,9 +324,8 @@ void Shiftboss::processShiftbossRegistrationResponseMessage() { DCHECK_EQ(kShiftbossRegistrationResponseMessage, tagged_message.message_type()); foreman_client_id_ = annotated_message.sender; - DLOG(INFO) << "Shiftboss (id '" << shiftboss_client_id_local_ - << "') received the typed '" << kShiftbossRegistrationResponseMessage - << "' message from ForemanDistributed with client " << foreman_client_id_; + DLOG(INFO) << "Shiftboss with Client " << shiftboss_client_id_local_ + << " received ShiftbossRegistrationResponseMessage from Foreman with Client " << foreman_client_id_; serialization::ShiftbossRegistrationResponseMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -363,9 +362,8 @@ void Shiftboss::processQueryInitiateMessage( kQueryInitiateResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') sent QueryInitiateResponseMessage (typed '" << kQueryInitiateResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_ + << " sent QueryInitiateResponseMessage to Foreman with Client " << foreman_client_id_; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_global_, shiftboss_client_id_global_, @@ -402,9 +400,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kInitiateRebuildResponseMessage); free(proto_bytes); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_global_ - << "') sent InitiateRebuildResponseMessage (typed '" << kInitiateRebuildResponseMessage - << "') to Foreman with TMB client ID " << foreman_client_id_; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_global_ + << " sent InitiateRebuildResponseMessage to Foreman with Client " << foreman_client_id_; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_global_, shiftboss_client_id_global_, @@ -431,9 +428,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id, kRebuildWorkOrderMessage); const size_t worker_index = getSchedulableWorker(); - DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " (id '" << shiftboss_client_id_local_ - << "') sent RebuildWorkOrderMessage (typed '" << kRebuildWorkOrderMessage - << "') to worker " << worker_index; + DLOG(INFO) << "Shiftboss " << shiftboss_index_ << " with Client " << shiftboss_client_id_local_ + << " sent RebuildWorkOrderMessage to Worker " << worker_index; const MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_local_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/Worker.cpp ---------------------------------------------------------------------- diff --git a/query_execution/Worker.cpp b/query_execution/Worker.cpp index 9a548fd..1882f2e 100644 --- a/query_execution/Worker.cpp +++ b/query_execution/Worker.cpp @@ -111,11 +111,8 @@ void Worker::sendWorkOrderCompleteMessage(const tmb::client_id receiver, #ifdef QUICKSTEP_DISTRIBUTED << " in Shiftboss " << shiftboss_index_ #endif // QUICKSTEP_DISTRIBUTED - << " sent " - << (message_type == kWorkOrderCompleteMessage ? "WorkOrderCompleteMessage" - : "RebuildWorkOrderCompleteMessage") - << " (typed '" << message_type - << "') to Scheduler with TMB client ID " << receiver; + << " sent " << QueryExecutionUtil::MessageTypeToString(message_type) + << " to Scheduler with Client " << receiver; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, worker_client_id_, receiver, std::move(tagged_message)); http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/query_execution/tests/BlockLocator_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp index 32437c3..426a2c9 100644 --- a/query_execution/tests/BlockLocator_unittest.cpp +++ b/query_execution/tests/BlockLocator_unittest.cpp @@ -97,9 +97,8 @@ class BlockLocatorTest : public ::testing::Test { TaggedMessage message(kPoisonMessage); - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') sent PoisonMessage (typed '" << kPoisonMessage - << "') to BlockLocator (id '" << locator_client_id_ << "')"; + LOG(INFO) << "Worker with Client " << worker_client_id_ << " sent PoisonMessage to BlockLocator with Client " + << locator_client_id_; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, worker_client_id_, @@ -120,9 +119,7 @@ class BlockLocatorTest : public ::testing::Test { kLocateBlockMessage); free(proto_bytes); - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') sent LocateBlockMessage (typed '" << kLocateBlockMessage - << "') to BlockLocator"; + LOG(INFO) << "Worker wth Client " << worker_client_id_ << " sent LocateBlockMessage to BlockLocator"; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, worker_client_id_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/DeleteOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp index 24da9bf..14cbf6f 100644 --- a/relational_operators/DeleteOperator.cpp +++ b/relational_operators/DeleteOperator.cpp @@ -146,8 +146,7 @@ void DeleteWorkOrder::execute() { kDataPipelineMessage); std::free(proto_bytes); - DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "DeleteWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/RebuildWorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp index 2cef1f1..7f0f7fc 100644 --- a/relational_operators/RebuildWorkOrder.hpp +++ b/relational_operators/RebuildWorkOrder.hpp @@ -101,8 +101,7 @@ class RebuildWorkOrder : public WorkOrder { // Refer to InsertDestination::sendBlockFilledMessage for the rationale // behind using the ClientIDMap map. - DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "RebuildWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, ClientIDMap::Instance()->getValue(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/UpdateOperator.cpp ---------------------------------------------------------------------- diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp index 143c741..40dfb22 100644 --- a/relational_operators/UpdateOperator.cpp +++ b/relational_operators/UpdateOperator.cpp @@ -128,8 +128,7 @@ void UpdateWorkOrder::execute() { kDataPipelineMessage); std::free(proto_bytes); - DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage (typed '" << kDataPipelineMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "UpdateWorkOrder sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage( bus_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/relational_operators/WorkOrder.hpp ---------------------------------------------------------------------- diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp index c1b9b68..97f2a74 100644 --- a/relational_operators/WorkOrder.hpp +++ b/relational_operators/WorkOrder.hpp @@ -283,8 +283,7 @@ class WorkOrder { tmb::MessageStyle single_receiver_style; DCHECK(bus != nullptr); - DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage (typed '" << kWorkOrderFeedbackMessage - << "') to Scheduler with TMB client ID " << receiver_id; + DLOG(INFO) << "WorkOrder sent WorkOrderFeedbackMessage to Scheduler with Client " << receiver_id; const tmb::MessageBus::SendStatus send_status = bus->Send(sender_id, receiver_address, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/InsertDestination.cpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp index 714e6e5..75e1217 100644 --- a/storage/InsertDestination.cpp +++ b/storage/InsertDestination.cpp @@ -360,9 +360,8 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() { kCatalogRelationNewBlockMessage); free(proto_bytes); - DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage (typed '" - << kCatalogRelationNewBlockMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "AlwaysCreateBlockInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client " + << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), @@ -410,9 +409,8 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() { kCatalogRelationNewBlockMessage); free(proto_bytes); - DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage (typed '" - << kCatalogRelationNewBlockMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "BlockPoolInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client " + << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), @@ -527,9 +525,8 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition kCatalogRelationNewBlockMessage); free(proto_bytes); - DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage (typed '" - << kCatalogRelationNewBlockMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "PartitionAwareInsertDestination sent CatalogRelationNewBlockMessage to Scheduler with Client " + << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/InsertDestination.hpp ---------------------------------------------------------------------- diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp index 6707192..e9335ce 100644 --- a/storage/InsertDestination.hpp +++ b/storage/InsertDestination.hpp @@ -255,8 +255,7 @@ class InsertDestination : public InsertDestinationInterface { // option 3. DCHECK(bus_ != nullptr); - DLOG(INFO) << "InsertDestination sent DataPipelineMessage (typed '" << kDataPipelineMessage - << "') to Scheduler with TMB client ID " << scheduler_client_id_; + DLOG(INFO) << "InsertDestination sent DataPipelineMessage to Scheduler with Client " << scheduler_client_id_; const tmb::MessageBus::SendStatus send_status = QueryExecutionUtil::SendTMBMessage(bus_, thread_id_map_.getValue(), http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index 94e1b67..4410385 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -256,9 +256,8 @@ StorageManager::~StorageManager() { kBlockDomainUnregistrationMessage); free(proto_bytes); - LOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ - << "') sent BlockDomainUnregistrationMessage (typed '" << kBlockDomainUnregistrationMessage - << "') to BlockLocator"; + LOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " sent BlockDomainUnregistrationMessage to BlockLocator"; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, @@ -483,9 +482,8 @@ void StorageManager::sendBlockDomainToShiftbossIndexMessage(const std::size_t sh kBlockDomainToShiftbossIndexMessage); free(proto_bytes); - DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ - << "') sent BlockDomainToShiftbossIndexMessage (typed '" << kBlockDomainToShiftbossIndexMessage - << "') to BlockLocator"; + DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " sent BlockDomainToShiftbossIndexMessage to BlockLocator"; DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone); DCHECK(bus_ != nullptr); @@ -592,9 +590,8 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc kGetPeerDomainNetworkAddressesMessage); free(proto_bytes); - DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ - << "') sent GetPeerDomainNetworkAddressesMessage (typed '" << kGetPeerDomainNetworkAddressesMessage - << "') to BlockLocator"; + DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " sent GetPeerDomainNetworkAddressesMessage to BlockLocator"; DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone); DCHECK(bus_ != nullptr); @@ -648,9 +645,9 @@ void StorageManager::sendBlockLocationMessage(const block_id block, message_type); free(proto_bytes); - DLOG(INFO) << "StorageManager (id '" << storage_manager_client_id_ - << "') sent BlockLocationMessage (typed '" << message_type - << "') to BlockLocator"; + DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " sent " << QueryExecutionUtil::MessageTypeToString(message_type) + << " to BlockLocator"; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/686bbb58/storage/tests/DataExchange_unittest.cpp ---------------------------------------------------------------------- diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp index ac39728..d1fdef6 100644 --- a/storage/tests/DataExchange_unittest.cpp +++ b/storage/tests/DataExchange_unittest.cpp @@ -113,9 +113,8 @@ class DataExchangeTest : public ::testing::Test { TaggedMessage message(kPoisonMessage); - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') sent PoisonMessage (typed '" << kPoisonMessage - << "') to BlockLocator (id '" << locator_client_id_ << "')"; + LOG(INFO) << "Worker with Client " << worker_client_id_ + << " sent PoisonMessage to BlockLocator with Client " << locator_client_id_; CHECK(MessageBus::SendStatus::kOK == QueryExecutionUtil::SendTMBMessage(&bus_, worker_client_id_,