Repository: incubator-quickstep Updated Branches: refs/heads/dist-pull-retry [created] c2b200f84
Retry pulling if RPC fails in the distributed version. Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/c2b200f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/c2b200f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/c2b200f8 Branch: refs/heads/dist-pull-retry Commit: c2b200f848d9528fe7dc826bfa1764b1a2d5bcd3 Parents: 256f9dd Author: Zuyu Zhang <zu...@apache.org> Authored: Mon Mar 13 22:42:54 2017 -0700 Committer: Zuyu Zhang <zu...@apache.org> Committed: Mon Mar 13 22:42:54 2017 -0700 ---------------------------------------------------------------------- storage/StorageManager.cpp | 59 +++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c2b200f8/storage/StorageManager.cpp ---------------------------------------------------------------------- diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp index c70eafa..ad7bd9d 100644 --- a/storage/StorageManager.cpp +++ b/storage/StorageManager.cpp @@ -528,25 +528,21 @@ bool StorageManager::DataExchangerClientAsync::Pull(const block_id block, grpc::CompletionQueue queue; - unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc( - stub_->AsyncPull(&context, request, &queue)); - PullResponse response; grpc::Status status; - rpc->Finish(&response, &status, reinterpret_cast<void*>(1)); - - void *got_tag; - bool ok = false; + do { + unique_ptr<grpc::ClientAsyncResponseReader<PullResponse>> rpc( + stub_->AsyncPull(&context, request, &queue)); + rpc->Finish(&response, &status, reinterpret_cast<void*>(1)); - queue.Next(&got_tag, &ok); - CHECK(got_tag == reinterpret_cast<void*>(1)); - CHECK(ok); + void *got_tag = nullptr; + bool ok = false; - if (!status.ok()) { - LOG(ERROR) << "DataExchangerClientAsync Pull error: RPC failed"; - return false; - } + queue.Next(&got_tag, &ok); + DCHECK(got_tag == reinterpret_cast<void*>(1)); + DCHECK(ok); + } while (!status.ok()); if (!response.is_valid()) { LOG(INFO) << "The pulling block not found in all the peers"; @@ -663,10 +659,21 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob( // already loaded before this function gets called. BlockHandle loaded_handle; -#ifdef QUICKSTEP_DISTRIBUTED // TODO(quickstep-team): Use a cost model to determine whether to load from // a remote peer or the disk. - if (BlockIdUtil::Domain(block) != block_domain_) { + const size_t num_slots = file_manager_->numSlots(block); + if (num_slots != 0) { + void *block_buffer = allocateSlots(num_slots, numa_node); + + const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots); + CHECK(status) << "Failed to read block from persistent storage: " << block; + + loaded_handle.block_memory = block_buffer; + loaded_handle.block_memory_size = num_slots; + } else { + bool pull_succeeded = false; + +#ifdef QUICKSTEP_DISTRIBUTED DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer"; const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block); for (const string &peer_domain_network_address : peer_domain_network_addresses) { @@ -675,25 +682,15 @@ StorageManager::BlockHandle StorageManager::loadBlockOrBlob( this); if (client.Pull(block, numa_node, &loaded_handle)) { - sendBlockLocationMessage(block, kAddBlockLocationMessage); - return loaded_handle; + pull_succeeded = true; + break; } } - - DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block) - << " from remote peers, so try to load from disk."; - } #endif - const size_t num_slots = file_manager_->numSlots(block); - DEBUG_ASSERT(num_slots != 0); - void *block_buffer = allocateSlots(num_slots, numa_node); - - const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots); - CHECK(status) << "Failed to read block from persistent storage: " << block; - - loaded_handle.block_memory = block_buffer; - loaded_handle.block_memory_size = num_slots; + CHECK(pull_succeeded) + << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers."; + } #ifdef QUICKSTEP_DISTRIBUTED if (bus_) {