Repository: incubator-quickstep Updated Branches: refs/heads/refactor-data-exchange [created] 9fae916de
Refactored the data exchange process. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9fae916d Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9fae916d Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9fae916d Branch: refs/heads/refactor-data-exchange Commit: 9fae916de2a5efe97d1050d99340f59c5028df63 Parents: 256f9dd Author: Zuyu Zhang <zu...@apache.org> Authored: Wed Mar 15 02:39:02 2017 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Wed Mar 15 02:39:02 2017 -0700 ---------------------------------------------------------------------- query_execution/BlockLocator.cpp | 28 +++++++ query_execution/BlockLocator.hpp | 4 + query_execution/QueryExecutionMessages.proto | 4 + query_execution/QueryExecutionTypedefs.hpp | 2 + query_execution/QueryExecutionUtil.hpp | 2 + storage/StorageManager.cpp | 99 +++++++++++++++++------ storage/StorageManager.hpp | 13 +++ 7 files changed, 127 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.cpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp index 765021e..94a0263 100644 --- a/query_execution/BlockLocator.cpp +++ b/query_execution/BlockLocator.cpp @@ -137,6 +137,10 @@ void BlockLocator::run() { processLocateBlockMessage(sender, proto.block_id()); break; } + case kGetAllDomainNetworkAddressesMessage: { + processGetAllDomainNetworkAddressesMessage(sender); + break; + } case kGetPeerDomainNetworkAddressesMessage: { serialization::BlockMessage proto; CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); @@ -228,6 +232,30 @@ void BlockLocator::processLocateBlockMessage(const client_id receiver, move(message))); } +void BlockLocator::processGetAllDomainNetworkAddressesMessage(const client_id receiver) { + serialization::GetAllDomainNetworkAddressesResponseMessage proto; + + // NOTE(zuyu): We don't need to protect here, as all the writers are in the + // single thread. + for (const auto &domain_network_address_pair : domain_network_addresses_) { + (*proto.mutable_domain_network_addresses())[domain_network_address_pair.first] = + domain_network_address_pair.second; + } + + const int proto_length = proto.ByteSize(); + char *proto_bytes = static_cast<char*>(malloc(proto_length)); + CHECK(proto.SerializeToArray(proto_bytes, proto_length)); + + TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length, + kGetAllDomainNetworkAddressesResponseMessage); + free(proto_bytes); + + DLOG(INFO) << "BlockLocator with Client " << locator_client_id_ + << " sent GetAllDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver; + CHECK(tmb::MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, receiver, move(message))); +} + void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver, const block_id block) { serialization::GetPeerDomainNetworkAddressesResponseMessage proto; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/BlockLocator.hpp ---------------------------------------------------------------------- diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp index 4690369..fd4b9ee 100644 --- a/query_execution/BlockLocator.hpp +++ b/query_execution/BlockLocator.hpp @@ -80,6 +80,9 @@ class BlockLocator : public Thread { bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage); bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); + bus_->RegisterClientAsReceiver(locator_client_id_, kGetAllDomainNetworkAddressesMessage); + bus_->RegisterClientAsSender(locator_client_id_, kGetAllDomainNetworkAddressesResponseMessage); + bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage); bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage); } @@ -141,6 +144,7 @@ class BlockLocator : public Thread { private: void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address); void processLocateBlockMessage(const tmb::client_id receiver, const block_id block); + void processGetAllDomainNetworkAddressesMessage(const tmb::client_id receiver); void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block); tmb::MessageBus *bus_; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionMessages.proto ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto index e8f102a..60c4be8 100644 --- a/query_execution/QueryExecutionMessages.proto +++ b/query_execution/QueryExecutionMessages.proto @@ -171,3 +171,7 @@ message LocateBlockResponseMessage { message GetPeerDomainNetworkAddressesResponseMessage { repeated string domain_network_addresses = 1; } + +message GetAllDomainNetworkAddressesResponseMessage { + map<uint32, string> domain_network_addresses = 1; +} http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionTypedefs.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp index afdac92..7c2cd1b 100644 --- a/query_execution/QueryExecutionTypedefs.hpp +++ b/query_execution/QueryExecutionTypedefs.hpp @@ -119,6 +119,8 @@ enum QueryExecutionMessageType : message_type_id { kLocateBlockResponseMessage, // From BlockLocator to StorageManager. kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator. kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager. + kGetAllDomainNetworkAddressesMessage, // From StorageManager to BlockLocator. + kGetAllDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager. kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator. #endif }; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/query_execution/QueryExecutionUtil.hpp ---------------------------------------------------------------------- diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp index 3f74af3..6494f62 100644 --- a/query_execution/QueryExecutionUtil.hpp +++ b/query_execution/QueryExecutionUtil.hpp @@ -85,6 +85,8 @@ class QueryExecutionUtil { case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage"; case kLocateBlockMessage: return "LocateBlockMessage"; case kLocateBlockResponseMessage: return "LocateBlockResponseMessage"; + case kGetAllDomainNetworkAddressesMessage: return "GetAllDomainNetworkAddressesMessage"; + case kGetAllDomainNetworkAddressesResponseMessage: return "GetAllDomainNetworkAddressesResponseMessage"; case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage"; case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage"; case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage"; http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index c70eafa..a06301b 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -224,6 +224,9 @@ StorageManager::StorageManager( if (bus_) { storage_manager_client_id_ = bus_->Connect(); + bus_->RegisterClientAsSender(storage_manager_client_id_, kGetAllDomainNetworkAddressesMessage); + bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetAllDomainNetworkAddressesResponseMessage); + bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage); bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage); @@ -619,6 +622,56 @@ vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id bloc return domain_network_addresses; } +string StorageManager::getPeerDomainNetworkAddress(const block_id_domain block_domain) { + { + SpinSharedMutexSharedLock<false> read_lock(block_domain_network_addresses_shared_mutex_); + const auto cit = block_domain_network_addresses_.find(block_domain); + if (cit != block_domain_network_addresses_.end()) { + return cit->second; + } + } + + { + SpinSharedMutexExclusiveLock<false> write_lock(block_domain_network_addresses_shared_mutex_); + + // Check one more time if the block domain network info got set up by someone else. + auto cit = block_domain_network_addresses_.find(block_domain); + if (cit != block_domain_network_addresses_.end()) { + return cit->second; + } + + DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " sent GetAllDomainNetworkAddressesMessage to BlockLocator"; + + DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone); + DCHECK(bus_ != nullptr); + CHECK(MessageBus::SendStatus::kOK == + QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, block_locator_client_id_, + TaggedMessage(kGetAllDomainNetworkAddressesMessage))); + + const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true)); + const TaggedMessage &tagged_message = annotated_message.tagged_message; + CHECK_EQ(block_locator_client_id_, annotated_message.sender); + CHECK_EQ(kGetAllDomainNetworkAddressesResponseMessage, tagged_message.message_type()); + DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_ + << " received GetAllDomainNetworkAddressesResponseMessage from BlockLocator"; + + serialization::GetAllDomainNetworkAddressesResponseMessage proto; + CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes())); + + for (const auto &domain_network_address_pair : proto.domain_network_addresses()) { + const block_id_domain block_domain = domain_network_address_pair.first; + if (block_domain_network_addresses_.find(block_domain) == block_domain_network_addresses_.end()) { + block_domain_network_addresses_.emplace(block_domain, domain_network_address_pair.second); + } + } + + cit = block_domain_network_addresses_.find(block_domain); + DCHECK(cit != block_domain_network_addresses_.end()); + return cit->second; + } +} + void StorageManager::sendBlockLocationMessage(const block_id block, const tmb::message_type_id message_type) { switch (message_type) { @@ -663,37 +716,33 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob( // already loaded before this function gets called. BlockHandle loaded_handle; -#ifdef QUICKSTEP_DISTRIBUTED // TODO(quickstep-team): Use a cost model to determine whether to load from // a remote peer or the disk. - if (BlockIdUtil::Domain(block) != block_domain_) { - DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer"; - const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block); - for (const string &peer_domain_network_address : peer_domain_network_addresses) { - DataExchangerClientAsync client( - grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()), - this); - - if (client.Pull(block, numa_node, &loaded_handle)) { - sendBlockLocationMessage(block, kAddBlockLocationMessage); - return loaded_handle; - } - } + const size_t num_slots = file_manager_->numSlots(block); + if (num_slots != 0) { + void *block_buffer = allocateSlots(num_slots, numa_node); - DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block) - << " from remote peers, so try to load from disk."; - } -#endif + const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots); + CHECK(status) << "Failed to read block from persistent storage: " << block; - const size_t num_slots = file_manager_->numSlots(block); - DEBUG_ASSERT(num_slots != 0); - void *block_buffer = allocateSlots(num_slots, numa_node); + loaded_handle.block_memory = block_buffer; + loaded_handle.block_memory_size = num_slots; + } else { + bool pull_succeeded = false; - const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots); - CHECK(status) << "Failed to read block from persistent storage: " << block; +#ifdef QUICKSTEP_DISTRIBUTED + DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer"; + DataExchangerClientAsync client( + grpc::CreateChannel(getPeerDomainNetworkAddress(BlockIdUtil::Domain(block)), + grpc::InsecureChannelCredentials()), + this); + if (client.Pull(block, numa_node, &loaded_handle)) { + pull_succeeded = true; + } +#endif - loaded_handle.block_memory = block_buffer; - loaded_handle.block_memory_size = num_slots; + CHECK(pull_succeeded) << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers."; + } #ifdef QUICKSTEP_DISTRIBUTED if (bus_) { http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9fae916d/storage/StorageManager.hpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp index dc4b7e8..c40e1bf 100644 --- a/storage/StorageManager.hpp +++ b/storage/StorageManager.hpp @@ -456,6 +456,15 @@ class StorageManager { std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block); /** + * @brief Get the network info of the given block domain. + * + * @param block_domain The domain of block or blob to pull. + * + * @return The network info of the given block domain. + **/ + std::string getPeerDomainNetworkAddress(const block_id_domain block_domain); + + /** * @brief Update the block location info in BlockLocator. * * @param block The given block or blob. @@ -615,6 +624,10 @@ class StorageManager { std::unordered_map<block_id, BlockHandle> blocks_; alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_; + // Used to pull a remote block. + std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_; + alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_; + // This lock manager is used with the following contract: // (1) A block cannot be evicted unless an exclusive lock is held on its // lock shard.