Added an util to get block domain from BlockLocator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/56555117 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/56555117 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/56555117 Branch: refs/heads/ethernet-data-exchanger Commit: 565551173e797c65651eef0408c74b55e27a5796 Parents: f7d1543 Author: Zuyu Zhang <zu...@apache.org> Authored: Sun Nov 20 15:17:46 2016 -0800 Committer: Zuyu Zhang <zu...@apache.org> Committed: Sun Nov 20 19:58:14 2016 -0800 ---------------------------------------------------------------------- query_execution/BlockLocatorUtil.cpp | 93 ++++++++++++++++++++ query_execution/BlockLocatorUtil.hpp | 59 +++++++++++++ query_execution/CMakeLists.txt | 9 ++ query_execution/tests/BlockLocator_unittest.cpp | 42 +-------- query_optimizer/tests/CMakeLists.txt | 4 +- .../DistributedExecutionGeneratorTestRunner.cpp | 54 +++--------- .../DistributedExecutionGeneratorTestRunner.hpp | 4 +- storage/CMakeLists.txt | 4 +- storage/tests/DataExchange_unittest.cpp | 50 ++--------- 9 files changed, 187 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocatorUtil.cpp b/query_execution/BlockLocatorUtil.cpp new file mode 100644 index 0000000..d2d1e96 --- /dev/null +++ b/query_execution/BlockLocatorUtil.cpp @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#include "query_execution/BlockLocatorUtil.hpp" + +#include <cstdlib> +#include <string> +#include <utility> + +#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/QueryExecutionTypedefs.hpp" +#include "storage/StorageBlockInfo.hpp" + +#include "glog/logging.h" + +#include "tmb/address.h" +#include "tmb/id_typedefs.h" +#include "tmb/message_bus.h" +#include "tmb/message_style.h" +#include "tmb/tagged_message.h" + +using tmb::TaggedMessage; +using tmb::MessageBus; +using tmb::client_id; + +namespace quickstep { +namespace block_locator { + +namespace S = ::quickstep::serialization; + +block_id_domain getBlockDomain(const std::string &network_address, + const client_id cli_id, + client_id *locator_client_id, + MessageBus *bus) { + tmb::Address address; + address.All(true); + // NOTE(zuyu): The singleton BlockLocator would need only one copy of the message. + tmb::MessageStyle style; + + S::BlockDomainRegistrationMessage proto; + proto.set_domain_network_address(network_address); + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(std::malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), + proto_length, + kBlockDomainRegistrationMessage); + std::free(proto_bytes); + + DLOG(INFO) << "Client (id '" << cli_id + << "') broadcasts BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage + << "') to BlockLocator."; + + CHECK(MessageBus::SendStatus::kOK == + bus->Send(cli_id, address, style, std::move(message))); + + const tmb::AnnotatedMessage annotated_message(bus->Receive(cli_id, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); + + *locator_client_id = annotated_message.sender; + + DLOG(INFO) << "Client (id '" << cli_id + << "') received BlockDomainRegistrationResponseMessage (typed '" + << kBlockDomainRegistrationResponseMessage + << "') from BlockLocator (id '" << *locator_client_id << "')."; + + S::BlockDomainMessage response_proto; + CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + return static_cast<block_id_domain>(response_proto.block_domain()); +} + +} // namespace block_locator +} // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/BlockLocatorUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocatorUtil.hpp b/query_execution/BlockLocatorUtil.hpp new file mode 100644 index 0000000..74f65e4 --- /dev/null +++ b/query_execution/BlockLocatorUtil.hpp @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + **/ + +#ifndef QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ +#define QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ + +#include <string> + +#include "storage/StorageBlockInfo.hpp" + +#include "tmb/id_typedefs.h" + +namespace tmb { class MessageBus; } + +namespace quickstep { +namespace block_locator { + +/** \addtogroup QueryExecution + * @{ + */ + +/** + * @brief Broadcast to find BlockLocator to get a block domain for + * StorageManager with the given network address. + * + * @param network_address The network address of the StorageManager. + * @param cli_id The client ID of the block domain requester. + * @param locator_client_id The client ID of BlockLocator to set. + * @param bus A pointer to the TMB. + * + * @return The requested block domain. + **/ +block_id_domain getBlockDomain(const std::string &network_address, + const tmb::client_id cli_id, + tmb::client_id *locator_client_id, + tmb::MessageBus *bus); + +/** @} */ + +} // namespace block_locator +} // namespace quickstep + +#endif // QUICKSTEP_QUERY_EXECUTION_BLOCK_LOCATOR_UTIL_HPP_ http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt index eec0029..719d9f3 100644 --- a/query_execution/CMakeLists.txt +++ b/query_execution/CMakeLists.txt @@ -31,6 +31,7 @@ endif() add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp) if (ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp) + add_library(quickstep_queryexecution_BlockLocatorUtil BlockLocatorUtil.cpp BlockLocatorUtil.hpp) endif(ENABLE_DISTRIBUTED) add_library(quickstep_queryexecution_ForemanBase ../empty_src.cpp ForemanBase.hpp) if (ENABLE_DISTRIBUTED) @@ -83,6 +84,12 @@ if (ENABLE_DISTRIBUTED) quickstep_threading_ThreadUtil quickstep_utility_Macros tmb) + target_link_libraries(quickstep_queryexecution_BlockLocatorUtil + glog + quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_QueryExecutionTypedefs + quickstep_storage_StorageBlockInfo + tmb) endif(ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution_ForemanBase glog @@ -345,6 +352,7 @@ target_link_libraries(quickstep_queryexecution if (ENABLE_DISTRIBUTED) target_link_libraries(quickstep_queryexecution quickstep_queryexecution_BlockLocator + quickstep_queryexecution_BlockLocatorUtil quickstep_queryexecution_ForemanDistributed quickstep_queryexecution_PolicyEnforcerDistributed quickstep_queryexecution_QueryManagerDistributed @@ -363,6 +371,7 @@ if (ENABLE_DISTRIBUTED) quickstep_catalog_CatalogAttribute quickstep_catalog_CatalogRelation quickstep_queryexecution_BlockLocator + quickstep_queryexecution_BlockLocatorUtil quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_execution/tests/BlockLocator_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp index 465f2a3..32437c3 100644 --- a/query_execution/tests/BlockLocator_unittest.cpp +++ b/query_execution/tests/BlockLocator_unittest.cpp @@ -26,6 +26,7 @@ #include "catalog/CatalogAttribute.hpp" #include "catalog/CatalogRelation.hpp" #include "query_execution/BlockLocator.hpp" +#include "query_execution/BlockLocatorUtil.hpp" #include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" @@ -71,7 +72,6 @@ class BlockLocatorTest : public ::testing::Test { bus_.Initialize(); locator_.reset(new BlockLocator(&bus_)); - locator_client_id_ = locator_->getBusClientID(); locator_->start(); worker_client_id_ = bus_.Connect(); @@ -84,7 +84,9 @@ class BlockLocatorTest : public ::testing::Test { bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage); - block_domain_ = getBlockDomain(kDomainNetworkAddress); + block_domain_ = + block_locator::getBlockDomain(kDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_); + DCHECK_EQ(locator_->getBusClientID(), locator_client_id_); storage_manager_.reset( new StorageManager(kStoragePath, block_domain_, locator_client_id_, &bus_)); @@ -168,42 +170,6 @@ class BlockLocatorTest : public ::testing::Test { unique_ptr<StorageManager> storage_manager_; private: - block_id_domain getBlockDomain(const string &network_address) { - serialization::BlockDomainRegistrationMessage proto; - proto.set_domain_network_address(network_address); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kBlockDomainRegistrationMessage); - free(proto_bytes); - - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage - << "') to BlockLocator (id '" << locator_client_id_ << "')"; - - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, - worker_client_id_, - locator_client_id_, - move(message))); - - const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - EXPECT_EQ(locator_client_id_, annotated_message.sender); - EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') received BlockDomainRegistrationResponseMessage from BlockLocator"; - - serialization::BlockDomainMessage response_proto; - CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - return static_cast<block_id_domain>(response_proto.block_domain()); - } - MessageBusImpl bus_; unique_ptr<BlockLocator> locator_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/CMakeLists.txt b/query_optimizer/tests/CMakeLists.txt index 9c764e4..b987322 100644 --- a/query_optimizer/tests/CMakeLists.txt +++ b/query_optimizer/tests/CMakeLists.txt @@ -115,8 +115,8 @@ if (ENABLE_DISTRIBUTED) quickstep_parser_ParseStatement quickstep_parser_SqlParserWrapper quickstep_queryexecution_BlockLocator + quickstep_queryexecution_BlockLocatorUtil quickstep_queryexecution_ForemanDistributed - quickstep_queryexecution_QueryExecutionMessages_proto quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_queryexecution_Shiftboss @@ -127,7 +127,7 @@ if (ENABLE_DISTRIBUTED) quickstep_queryoptimizer_QueryHandle quickstep_queryoptimizer_tests_TestDatabaseLoader quickstep_storage_DataExchangerAsync - quickstep_storage_StorageBlockInfo + quickstep_storage_StorageManager quickstep_utility_Macros quickstep_utility_MemStream quickstep_utility_SqlError http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp index 0403e77..2351dcd 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.cpp @@ -32,14 +32,14 @@ #include "cli/PrintToScreen.hpp" #include "parser/ParseStatement.hpp" #include "query_execution/BlockLocator.hpp" +#include "query_execution/BlockLocatorUtil.hpp" #include "query_execution/ForemanDistributed.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "query_optimizer/OptimizerContext.hpp" #include "query_optimizer/QueryHandle.hpp" #include "storage/DataExchangerAsync.hpp" -#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" #include "utility/MemStream.hpp" #include "utility/SqlError.hpp" @@ -81,14 +81,15 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner bus_.RegisterClientAsReceiver(cli_id_, kBlockDomainRegistrationResponseMessage); block_locator_ = make_unique<BlockLocator>(&bus_); - locator_client_id_ = block_locator_->getBusClientID(); block_locator_->start(); test_database_loader_ = make_unique<TestDatabaseLoader>( storage_path, - getBlockDomain(test_database_loader_data_exchanger_.network_address()), + block_locator::getBlockDomain( + test_database_loader_data_exchanger_.network_address(), cli_id_, &locator_client_id_, &bus_), locator_client_id_, &bus_); + DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_); test_database_loader_data_exchanger_.set_storage_manager(test_database_loader_->storage_manager()); test_database_loader_data_exchanger_.start(); @@ -111,7 +112,11 @@ DistributedExecutionGeneratorTestRunner::DistributedExecutionGeneratorTestRunner make_unique<WorkerDirectory>(worker_client_ids.size(), worker_client_ids, numa_nodes)); auto storage_manager = make_unique<StorageManager>( - storage_path, getBlockDomain(data_exchangers_[i].network_address()), locator_client_id_, &bus_); + storage_path, + block_locator::getBlockDomain( + data_exchangers_[i].network_address(), cli_id_, &locator_client_id_, &bus_), + locator_client_id_, &bus_); + DCHECK_EQ(block_locator_->getBusClientID(), locator_client_id_); data_exchangers_[i].set_storage_manager(storage_manager.get()); shiftbosses_.push_back( @@ -193,44 +198,5 @@ void DistributedExecutionGeneratorTestRunner::runTestCase( } } -block_id_domain DistributedExecutionGeneratorTestRunner::getBlockDomain( - const string &network_address) { - serialization::BlockDomainRegistrationMessage proto; - proto.set_domain_network_address(network_address); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kBlockDomainRegistrationMessage); - free(proto_bytes); - - DLOG(INFO) << "Client (id '" << cli_id_ - << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage - << "') to BlockLocator (id '" << locator_client_id_ << "')"; - - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, - cli_id_, - locator_client_id_, - move(message))); - - const tmb::AnnotatedMessage annotated_message(bus_.Receive(cli_id_, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - CHECK_EQ(locator_client_id_, annotated_message.sender); - CHECK_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); - DLOG(INFO) << "Client (id '" << cli_id_ - << "') received BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') from BlockLocator"; - - serialization::BlockDomainMessage response_proto; - CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - return static_cast<block_id_domain>(response_proto.block_domain()); -} - } // namespace optimizer } // namespace quickstep http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp ---------------------------------------------------------------------- diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp index d2b13e4..63e320d 100644 --- a/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp +++ b/query_optimizer/tests/DistributedExecutionGeneratorTestRunner.hpp @@ -38,7 +38,7 @@ #include "query_optimizer/Optimizer.hpp" #include "query_optimizer/tests/TestDatabaseLoader.hpp" #include "storage/DataExchangerAsync.hpp" -#include "storage/StorageBlockInfo.hpp" +#include "storage/StorageManager.hpp" #include "utility/Macros.hpp" #include "utility/textbased_test/TextBasedTestRunner.hpp" @@ -115,8 +115,6 @@ class DistributedExecutionGeneratorTestRunner : public TextBasedTestRunner { std::string *output) override; private: - block_id_domain getBlockDomain(const std::string &network_address); - std::size_t query_id_; SqlParserWrapper sql_parser_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt index be60662..559d86d 100644 --- a/storage/CMakeLists.txt +++ b/storage/CMakeLists.txt @@ -862,7 +862,7 @@ target_link_libraries(quickstep_storage_PartitionedHashTablePool quickstep_storage_FastHashTableFactory quickstep_storage_HashTableBase quickstep_utility_Macros - quickstep_utility_StringUtil) + quickstep_utility_StringUtil) target_link_libraries(quickstep_storage_PreloaderThread glog quickstep_catalog_CatalogDatabase @@ -1502,7 +1502,7 @@ if (ENABLE_DISTRIBUTED) quickstep_catalog_CatalogRelation quickstep_catalog_CatalogTypedefs quickstep_queryexecution_BlockLocator - quickstep_queryexecution_QueryExecutionMessages_proto + quickstep_queryexecution_BlockLocatorUtil quickstep_queryexecution_QueryExecutionTypedefs quickstep_queryexecution_QueryExecutionUtil quickstep_storage_CountedReference http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/56555117/storage/tests/DataExchange_unittest.cpp ---------------------------------------------------------------------- diff --git a/storage/tests/DataExchange_unittest.cpp b/storage/tests/DataExchange_unittest.cpp index 9c75150..ac39728 100644 --- a/storage/tests/DataExchange_unittest.cpp +++ b/storage/tests/DataExchange_unittest.cpp @@ -27,7 +27,7 @@ #include "catalog/CatalogRelation.hpp" #include "catalog/CatalogTypedefs.hpp" #include "query_execution/BlockLocator.hpp" -#include "query_execution/QueryExecutionMessages.pb.h" +#include "query_execution/BlockLocatorUtil.hpp" #include "query_execution/QueryExecutionTypedefs.hpp" #include "query_execution/QueryExecutionUtil.hpp" #include "storage/CountedReference.hpp" @@ -77,7 +77,6 @@ class DataExchangeTest : public ::testing::Test { bus_.Initialize(); locator_.reset(new BlockLocator(&bus_)); - locator_client_id_ = locator_->getBusClientID(); locator_->start(); worker_client_id_ = bus_.Connect(); @@ -88,18 +87,22 @@ class DataExchangeTest : public ::testing::Test { storage_manager_expected_.reset(new StorageManager( kStoragePath, - getBlockDomain(data_exchanger_expected_.network_address()), + block_locator::getBlockDomain( + data_exchanger_expected_.network_address(), worker_client_id_, &locator_client_id_, &bus_), locator_client_id_, &bus_)); + DCHECK_EQ(locator_->getBusClientID(), locator_client_id_); data_exchanger_expected_.set_storage_manager(storage_manager_expected_.get()); data_exchanger_expected_.start(); storage_manager_checked_.reset(new StorageManager( kStoragePath, - getBlockDomain(kCheckedDomainNetworkAddress), + block_locator::getBlockDomain( + kCheckedDomainNetworkAddress, worker_client_id_, &locator_client_id_, &bus_), locator_client_id_, &bus_)); + DCHECK_EQ(locator_->getBusClientID(), locator_client_id_); } virtual void TearDown() { @@ -123,45 +126,6 @@ class DataExchangeTest : public ::testing::Test { unique_ptr<StorageManager> storage_manager_expected_, storage_manager_checked_; private: - block_id_domain getBlockDomain(const string &network_address) { - serialization::BlockDomainRegistrationMessage proto; - proto.set_domain_network_address(network_address); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kBlockDomainRegistrationMessage); - free(proto_bytes); - - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') sent BlockDomainRegistrationMessage (typed '" << kBlockDomainRegistrationMessage - << "') to BlockLocator (id '" << locator_client_id_ << "')"; - - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, - worker_client_id_, - locator_client_id_, - move(message))); - - const tmb::AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - EXPECT_EQ(locator_client_id_, annotated_message.sender); - EXPECT_EQ(kBlockDomainRegistrationResponseMessage, tagged_message.message_type()); - - LOG(INFO) << "Worker (id '" << worker_client_id_ - << "') received BlockDomainRegistrationResponseMessage (typed '" - << kBlockDomainRegistrationResponseMessage - << "') from BlockLocator"; - - serialization::BlockDomainMessage response_proto; - CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - return static_cast<block_id_domain>(response_proto.block_domain()); - } - MessageBusImpl bus_; unique_ptr<BlockLocator> locator_;