This is an automated email from the ASF dual-hosted git repository. junchao pushed a commit to branch txn_interface in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
commit 9835ff5eb071622262ab9a1baaf7928b5ae55856 Author: JunchaoChen <[email protected]> AuthorDate: Mon Jan 29 22:22:41 2024 +0000 add GetRequestFromReplica interface --- interface/common/resdb_txn_accessor.cpp | 34 +++++++++++++++++++++++++++++++++ interface/common/resdb_txn_accessor.h | 2 ++ 2 files changed, 36 insertions(+) diff --git a/interface/common/resdb_txn_accessor.cpp b/interface/common/resdb_txn_accessor.cpp index eda1ddf9..532d432a 100644 --- a/interface/common/resdb_txn_accessor.cpp +++ b/interface/common/resdb_txn_accessor.cpp @@ -116,6 +116,40 @@ ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t max_seq) { return txn_resp; } +absl::StatusOr<std::vector<Request>> ResDBTxnAccessor::GetRequestFromReplica( + uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica) { + QueryRequest request; + request.set_min_seq(min_seq); + request.set_max_seq(max_seq); + + std::unique_ptr<NetChannel> client = + GetNetChannel(replica.ip(), replica.port()); + + std::string response_str; + int ret = client->SendRequest(request, Request::TYPE_QUERY); + if (ret) { + return absl::InternalError("send data fail."); + } + client->SetRecvTimeout(1000); + ret = client->RecvRawMessageStr(&response_str); + if (ret) { + return absl::InternalError("recv data fail."); + } + + QueryResponse resp; + + if (!resp.ParseFromString(response_str)) { + LOG(ERROR) << "parse fail len:" << response_str.size(); + return absl::InternalError("recv data fail."); + } + + std::vector<Request> txn_resp; + for (auto& transaction : resp.transactions()) { + txn_resp.push_back(transaction); + } + return txn_resp; +} + absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() { QueryRequest request; request.set_min_seq(0); diff --git a/interface/common/resdb_txn_accessor.h b/interface/common/resdb_txn_accessor.h index ecfb5552..6f1e1fd9 100644 --- a/interface/common/resdb_txn_accessor.h +++ b/interface/common/resdb_txn_accessor.h @@ -37,6 +37,8 @@ class ResDBTxnAccessor { virtual absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( uint64_t min_seq, uint64_t max_seq); + virtual absl::StatusOr<std::vector<Request>> GetRequestFromReplica( + uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica); virtual absl::StatusOr<uint64_t> GetBlockNumbers(); protected:
