This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/master by this push:
new 7d478836 add GetRequestFromReplica interface (#133)
7d478836 is described below
commit 7d4788360083ad8497d7b4ffb1760a098de3227a
Author: cjcchen <[email protected]>
AuthorDate: Tue Jan 30 06:44:38 2024 +0800
add GetRequestFromReplica interface (#133)
* add GetRequestFromReplica interface
* fix test case failed
---------
Co-authored-by: JunchaoChen <[email protected]>
---
interface/common/resdb_txn_accessor.cpp | 34 ++++++++++++++++++++++++++++
interface/common/resdb_txn_accessor.h | 2 ++
interface/common/resdb_txn_accessor_test.cpp | 5 ++--
3 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/interface/common/resdb_txn_accessor.cpp
b/interface/common/resdb_txn_accessor.cpp
index eda1ddf9..532d432a 100644
--- a/interface/common/resdb_txn_accessor.cpp
+++ b/interface/common/resdb_txn_accessor.cpp
@@ -116,6 +116,40 @@ ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t
max_seq) {
return txn_resp;
}
+absl::StatusOr<std::vector<Request>> ResDBTxnAccessor::GetRequestFromReplica(
+ uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica) {
+ QueryRequest request;
+ request.set_min_seq(min_seq);
+ request.set_max_seq(max_seq);
+
+ std::unique_ptr<NetChannel> client =
+ GetNetChannel(replica.ip(), replica.port());
+
+ std::string response_str;
+ int ret = client->SendRequest(request, Request::TYPE_QUERY);
+ if (ret) {
+ return absl::InternalError("send data fail.");
+ }
+ client->SetRecvTimeout(1000);
+ ret = client->RecvRawMessageStr(&response_str);
+ if (ret) {
+ return absl::InternalError("recv data fail.");
+ }
+
+ QueryResponse resp;
+
+ if (!resp.ParseFromString(response_str)) {
+ LOG(ERROR) << "parse fail len:" << response_str.size();
+ return absl::InternalError("recv data fail.");
+ }
+
+ std::vector<Request> txn_resp;
+ for (auto& transaction : resp.transactions()) {
+ txn_resp.push_back(transaction);
+ }
+ return txn_resp;
+}
+
absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() {
QueryRequest request;
request.set_min_seq(0);
diff --git a/interface/common/resdb_txn_accessor.h
b/interface/common/resdb_txn_accessor.h
index ecfb5552..6f1e1fd9 100644
--- a/interface/common/resdb_txn_accessor.h
+++ b/interface/common/resdb_txn_accessor.h
@@ -37,6 +37,8 @@ class ResDBTxnAccessor {
virtual absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
uint64_t min_seq, uint64_t max_seq);
+ virtual absl::StatusOr<std::vector<Request>> GetRequestFromReplica(
+ uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica);
virtual absl::StatusOr<uint64_t> GetBlockNumbers();
protected:
diff --git a/interface/common/resdb_txn_accessor_test.cpp
b/interface/common/resdb_txn_accessor_test.cpp
index b85e9e50..e0b4925e 100644
--- a/interface/common/resdb_txn_accessor_test.cpp
+++ b/interface/common/resdb_txn_accessor_test.cpp
@@ -32,6 +32,7 @@ namespace {
using ::resdb::testing::EqualsProto;
using ::testing::_;
+using ::testing::AtLeast;
using ::testing::ElementsAre;
using ::testing::Invoke;
using ::testing::Pointee;
@@ -61,9 +62,9 @@ TEST(ResDBTxnAccessorTest, GetTransactionsFail) {
auto client = std::make_unique<MockNetChannel>(ip, port);
EXPECT_CALL(*client,
SendRequest(EqualsProto(request), Request::TYPE_QUERY, _))
- .WillOnce(Return(0));
+ .Times(AtLeast(1)).WillRepeatedly(Return(0));
EXPECT_CALL(*client, RecvRawMessageStr)
- .WillOnce(Invoke([&](std::string* resp) { return -1; }));
+ .Times(AtLeast(1)).WillRepeatedly(Invoke([&](std::string* resp) {
return -1; }));
return client;
}));
absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> resp =