Repository: incubator-quickstep Updated Branches: refs/heads/master 256f9dd2b -> 22bac39c0
Removed unnecessary messages in BlockLocator. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/22bac39c Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/22bac39c Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/22bac39c Branch: refs/heads/master Commit: 22bac39c0c25b48ce34a3a8dc4a79c51716a4e75 Parents: 256f9dd Author: Zuyu Zhang <zu...@apache.org> Authored: Thu Mar 16 15:43:42 2017 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Thu Mar 16 15:43:42 2017 -0700 ---------------------------------------------------------------------- query_execution/BlockLocator.cpp | 35 -------------- query_execution/BlockLocator.hpp | 33 ++++++++------ query_execution/QueryExecutionMessages.proto | 4 -- query_execution/QueryExecutionTypedefs.hpp | 2 - query_execution/QueryExecutionUtil.hpp | 2 - query_execution/tests/BlockLocator_unittest.cpp | 48 ++------------------ 6 files changed, 24 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp index 765021e..89c1c00 100644 --- a/query_execution/BlockLocator.cpp +++ b/query_execution/BlockLocator.cpp @@ -130,13 +130,6 @@ void BlockLocator::run() { } break; } - case kLocateBlockMessage: { - serialization::BlockMessage proto; - CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - processLocateBlockMessage(sender, proto.block_id()); - break; - } case kGetPeerDomainNetworkAddressesMessage: { serialization::BlockMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -200,34 +193,6 @@ void BlockLocator::processBlockDomainRegistrationMessage(const client_id receive move(message))); } -void BlockLocator::processLocateBlockMessage(const client_id receiver, - const block_id block) { - serialization::LocateBlockResponseMessage proto; - - // NOTE(zuyu): We don't need to protect here, as all the writers are in the - // single thread. - for (const block_id_domain domain : block_locations_[block]) { - proto.add_block_domains(domain); - } - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kLocateBlockResponseMessage); - free(proto_bytes); - - DLOG(INFO) << "BlockLocator with Client " << locator_client_id_ - << " sent LocateBlockResponseMessage to StorageManager with Client " << receiver; - CHECK(tmb::MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(bus_, - locator_client_id_, - receiver, - move(message))); -} - void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver, const block_id block) { serialization::GetPeerDomainNetworkAddressesResponseMessage proto; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/BlockLocator.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp index 4690369..f5572ca 100644 --- a/query_execution/BlockLocator.hpp +++ b/query_execution/BlockLocator.hpp @@ -74,9 +74,6 @@ class BlockLocator : public Thread { bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage); bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage); - bus_->RegisterClientAsReceiver(locator_client_id_, kLocateBlockMessage); - bus_->RegisterClientAsSender(locator_client_id_, kLocateBlockResponseMessage); - bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage); bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); @@ -96,29 +93,38 @@ class BlockLocator : public Thread { } /** - * @brief Get the block locality info for scheduling in ForemanDistributed. + * @brief Get the block domain info for a given block. * * @param block The given block. - * @param shiftboss_index_for_block The index of Shiftboss that has loaded the - * block in the buffer pool. * - * @return Whether the block locality info has found. + * @return The block domain info for a given block. **/ - bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const { - std::unordered_set<block_id_domain> block_domains; + std::unordered_set<block_id_domain> getBlockDomains(const block_id block) const { { // Lock 'block_locations_shared_mutex_' as briefly as possible as a // reader. SpinSharedMutexSharedLock<false> read_lock(block_locations_shared_mutex_); const auto cit = block_locations_.find(block); if (cit != block_locations_.end()) { - block_domains = cit->second; - } else { - return false; + return cit->second; } } - { + return std::unordered_set<block_id_domain>(); + } + + /** + * @brief Get the block locality info for scheduling in ForemanDistributed. + * + * @param block The given block. + * @param shiftboss_index_for_block The index of Shiftboss that has loaded the + * block in the buffer pool. + * + * @return Whether the block locality info has found. + **/ + bool getBlockLocalityInfo(const block_id block, std::size_t *shiftboss_index_for_block) const { + const std::unordered_set<block_id_domain> block_domains = getBlockDomains(block); + if (!block_domains.empty()) { // NOTE(zuyu): This lock is held for the rest duration of this call, as the // exclusive case is rare. SpinSharedMutexSharedLock<false> read_lock(block_domain_to_shiftboss_index_shared_mutex_); @@ -140,7 +146,6 @@ class BlockLocator : public Thread { private: void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address); - void processLocateBlockMessage(const tmb::client_id receiver, const block_id block); void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block); tmb::MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index e8f102a..c70b339 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -164,10 +164,6 @@ message BlockMessage { required fixed64 block_id = 1; } -message LocateBlockResponseMessage { - repeated uint32 block_domains = 1; -} - message GetPeerDomainNetworkAddressesResponseMessage { repeated string domain_network_addresses = 1; } http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index afdac92..c56bcfd 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -115,8 +115,6 @@ enum QueryExecutionMessageType : message_type_id { kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator. kAddBlockLocationMessage, // From StorageManager to BlockLocator. kDeleteBlockLocationMessage, // From StorageManager to BlockLocator. - kLocateBlockMessage, // From StorageManager to BlockLocator. - kLocateBlockResponseMessage, // From BlockLocator to StorageManager. kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator. kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager. kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator. http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index 3f74af3..1388426 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -83,8 +83,6 @@ class QueryExecutionUtil { case kBlockDomainToShiftbossIndexMessage: return "BlockDomainToShiftbossIndexMessage"; case kAddBlockLocationMessage: return "AddBlockLocationMessage"; case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage"; - case kLocateBlockMessage: return "LocateBlockMessage"; - case kLocateBlockResponseMessage: return "LocateBlockResponseMessage"; case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage"; case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage"; case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage"; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/22bac39c/query_execution/tests/BlockLocator_unittest.cpp ---------------------------------------------------------------------- diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp index b73c2f7..1a7cf17 100644 --- a/query_execution/tests/BlockLocator_unittest.cpp +++ b/query_execution/tests/BlockLocator_unittest.cpp @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <utility> +#include <unordered_set> #include <vector> #include "catalog/CatalogAttribute.hpp" @@ -51,6 +52,7 @@ using std::malloc; using std::move; using std::string; using std::unique_ptr; +using std::unordered_set; using std::vector; using tmb::AnnotatedMessage; @@ -79,9 +81,6 @@ class BlockLocatorTest : public ::testing::Test { bus_.RegisterClientAsSender(worker_client_id_, kBlockDomainRegistrationMessage); bus_.RegisterClientAsReceiver(worker_client_id_, kBlockDomainRegistrationResponseMessage); - bus_.RegisterClientAsSender(worker_client_id_, kLocateBlockMessage); - bus_.RegisterClientAsReceiver(worker_client_id_, kLocateBlockResponseMessage); - bus_.RegisterClientAsSender(worker_client_id_, kPoisonMessage); block_domain_ = @@ -106,58 +105,21 @@ class BlockLocatorTest : public ::testing::Test { move(message))); } - vector<block_id_domain> getPeerDomains(const block_id block) { - serialization::BlockMessage proto; - proto.set_block_id(block); - - const int proto_length = proto.ByteSize(); - char *proto_bytes = static_cast<char*>(malloc(proto_length)); - CHECK(proto.SerializeToArray(proto_bytes, proto_length)); - - TaggedMessage message(static_cast<const void*>(proto_bytes), - proto_length, - kLocateBlockMessage); - free(proto_bytes); - - LOG(INFO) << "Worker wth Client " << worker_client_id_ << " sent LocateBlockMessage to BlockLocator"; - CHECK(MessageBus::SendStatus::kOK == - QueryExecutionUtil::SendTMBMessage(&bus_, - worker_client_id_, - locator_client_id_, - move(message))); - - const AnnotatedMessage annotated_message(bus_.Receive(worker_client_id_, 0, true)); - const TaggedMessage &tagged_message = annotated_message.tagged_message; - CHECK_EQ(kLocateBlockResponseMessage, tagged_message.message_type()); - LOG(INFO) << "Worker with Client " << worker_client_id_ - << " received LocateBlockResponseMessage from BlockLocator"; - - serialization::LocateBlockResponseMessage response_proto; - CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); - - vector<block_id_domain> domains; - for (int i = 0; i < response_proto.block_domains_size(); ++i) { - domains.push_back(response_proto.block_domains(i)); - } - - return domains; - } - void checkLoaded(const block_id block) { const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block); EXPECT_EQ(1u, peer_domain_network_addresses.size()); EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data()); - const vector<block_id_domain> domains = getPeerDomains(block); + const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block); EXPECT_EQ(1u, domains.size()); - EXPECT_EQ(block_domain_, domains[0]); + EXPECT_EQ(1u, domains.count(block_domain_)); } void checkEvicted(const block_id block) { const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block); EXPECT_TRUE(peer_domain_network_addresses.empty()); - const vector<block_id_domain> domains = getPeerDomains(block); + const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block); EXPECT_TRUE(domains.empty()); }