This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/master by this push:
new dfdc9b62 Poe fix response (#225)
dfdc9b62 is described below
commit dfdc9b6217c09e59000f66f6b3784818153fa1d1
Author: cjcchen <[email protected]>
AuthorDate: Wed Jan 21 13:25:48 2026 +0800
Poe fix response (#225)
* change reponse num for poe
* change reponse num for poe
* add files
* add files
---
.../common/framework/performance_manager.cpp | 6 +++-
.../common/framework/performance_manager.h | 1 +
.../ordering/pbft/checkpoint_manager_test.cpp | 12 ++++---
.../consensus/ordering/pbft/commitment_test.cpp | 3 +-
platform/consensus/ordering/pbft/query_test.cpp | 3 +-
platform/consensus/ordering/poe/framework/BUILD | 11 ++++++
.../consensus/ordering/poe/framework/consensus.cpp | 14 ++++++++
.../consensus/ordering/poe/framework/consensus.h | 3 ++
.../{consensus.h => performance_manager.cpp} | 40 ++++++++--------------
.../{consensus.h => performance_manager.h} | 32 ++++++-----------
10 files changed, 71 insertions(+), 54 deletions(-)
diff --git
a/platform/consensus/ordering/common/framework/performance_manager.cpp
b/platform/consensus/ordering/common/framework/performance_manager.cpp
index c07088f1..ebaf1d6a 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -69,6 +69,10 @@ PerformanceManager::~PerformanceManager() {
int PerformanceManager::GetPrimary() { return primary_; }
+int PerformanceManager::NeedResponse() {
+ return config_.GetMinClientReceiveNum(); // f+1;
+}
+
std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_data(data_func_());
@@ -154,7 +158,7 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
return CollectorResultCode::OK;
}
response_[idx][seq]++;
- if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+ if (response_[idx][seq] >= NeedResponse()) {
response_[idx].erase(response_[idx].find(seq));
done = true;
}
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h
b/platform/consensus/ordering/common/framework/performance_manager.h
index a34f0573..c2dce102 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.h
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -46,6 +46,7 @@ class PerformanceManager {
protected:
virtual void SendMessage(const Request& request);
+ virtual int NeedResponse();
private:
// Add response messages which will be sent back to the caller
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
index 0f7ebc88..e13746c4 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
@@ -108,7 +108,8 @@ class CheckPointManagerTest : public Test {
TEST_F(CheckPointManagerTest, SendCheckPoint) {
config_.SetViewchangeCommitTimeout(100);
SystemInfo sys_info;
- CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
+ &sys_info);
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -135,7 +136,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointOnce) {
}));
SystemInfo sys_info;
- CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
+ &sys_info);
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
request->set_seq(i);
@@ -163,7 +165,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointTwo) {
}));
SystemInfo sys_info;
- CheckPointManager manager(config_, &replica_communicator_, nullptr,
&sys_info);
+ CheckPointManager manager(config_, &replica_communicator_, nullptr,
+ &sys_info);
std::unique_ptr<Request> request = std::make_unique<Request>();
for (int i = 1; i <= 5; ++i) {
std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -261,7 +264,8 @@ TEST_F(CheckPointManagerTest, Votes) {
std::future<bool> propose_done_future = propose_done.get_future();
SystemInfo sys_info;
- CheckPointManager manager(config_, &replica_communicator_, &mock_verifier,
&sys_info);
+ CheckPointManager manager(config_, &replica_communicator_, &mock_verifier,
+ &sys_info);
EXPECT_CALL(replica_communicator_, BroadCast)
.WillRepeatedly(Invoke([&](const google::protobuf::Message& message) {
for (int i = 1; i <= 3; ++i) {
diff --git a/platform/consensus/ordering/pbft/commitment_test.cpp
b/platform/consensus/ordering/pbft/commitment_test.cpp
index 156362e6..8ca54fe6 100644
--- a/platform/consensus/ordering/pbft/commitment_test.cpp
+++ b/platform/consensus/ordering/pbft/commitment_test.cpp
@@ -58,7 +58,8 @@ class CommitmentTest : public Test {
global_stats_(Stats::GetGlobalStats(1)),
config_(GenerateConfig()),
system_info_(config_),
- checkpoint_manager_(config_, &replica_communicator_, &verifier_,
&system_info_),
+ checkpoint_manager_(config_, &replica_communicator_, &verifier_,
+ &system_info_),
message_manager_(std::make_unique<MessageManager>(
config_, nullptr, &checkpoint_manager_, &system_info_)),
commitment_(
diff --git a/platform/consensus/ordering/pbft/query_test.cpp
b/platform/consensus/ordering/pbft/query_test.cpp
index b8639c6f..09056b87 100644
--- a/platform/consensus/ordering/pbft/query_test.cpp
+++ b/platform/consensus/ordering/pbft/query_test.cpp
@@ -59,7 +59,8 @@ class QueryTest : public Test {
: global_stats_(Stats::GetGlobalStats(1)),
config_(GenerateConfig()),
system_info_(config_),
- checkpoint_manager_(config_, &replica_communicator_, nullptr,
&system_info_),
+ checkpoint_manager_(config_, &replica_communicator_, nullptr,
+ &system_info_),
message_manager_(config_, nullptr, &checkpoint_manager_,
&system_info_),
recovery_(config_, &checkpoint_manager_, &system_info_, nullptr),
query_(config_, &recovery_),
diff --git a/platform/consensus/ordering/poe/framework/BUILD
b/platform/consensus/ordering/poe/framework/BUILD
index 3d697bc4..4a0bb12b 100644
--- a/platform/consensus/ordering/poe/framework/BUILD
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -26,8 +26,19 @@ cc_library(
"//visibility:public",
],
deps = [
+ ":performance_manager",
"//common/utils",
"//platform/consensus/ordering/common/framework:consensus",
"//platform/consensus/ordering/poe/algorithm:poe",
],
)
+
+cc_library(
+ name = "performance_manager",
+ srcs = ["performance_manager.cpp"],
+ hdrs = ["performance_manager.h"],
+ deps = [
+ "//platform/consensus/ordering/common/framework:performance_manager",
+ "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+ ],
+)
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp
b/platform/consensus/ordering/poe/framework/consensus.cpp
index ef7c521e..955c34ea 100644
--- a/platform/consensus/ordering/poe/framework/consensus.cpp
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -27,12 +27,26 @@
namespace resdb {
namespace poe {
+std::unique_ptr<PoEPerformanceManager> Consensus::GetPerformanceManager() {
+ return config_.IsPerformanceRunning()
+ ? std::make_unique<PoEPerformanceManager>(
+ config_, GetBroadCastClient(), GetSignatureVerifier())
+ : nullptr;
+}
+
Consensus::Consensus(const ResDBConfig& config,
std::unique_ptr<TransactionManager> executor)
: common::Consensus(config, std::move(executor)) {
int total_replicas = config_.GetReplicaNum();
int f = (total_replicas - 1) / 3;
+ if (config_.GetPublicKeyCertificateInfo()
+ .public_key()
+ .public_key_info()
+ .type() == CertificateKeyInfo::CLIENT) {
+ SetPerformanceManager(GetPerformanceManager());
+ }
+
Init();
start_ = 0;
diff --git a/platform/consensus/ordering/poe/framework/consensus.h
b/platform/consensus/ordering/poe/framework/consensus.h
index 21830d97..0a857ada 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -22,6 +22,7 @@
#include "executor/common/transaction_manager.h"
#include "platform/consensus/ordering/common/framework/consensus.h"
#include "platform/consensus/ordering/poe/algorithm/poe.h"
+#include "platform/consensus/ordering/poe/framework/performance_manager.h"
#include "platform/networkstrate/consensus_manager.h"
namespace resdb {
@@ -41,6 +42,8 @@ class Consensus : public common::Consensus {
int Prepare(const Transaction& txn);
+ std::unique_ptr<PoEPerformanceManager> GetPerformanceManager();
+
protected:
std::unique_ptr<PoE> poe_;
Stats* global_stats_;
diff --git a/platform/consensus/ordering/poe/framework/consensus.h
b/platform/consensus/ordering/poe/framework/performance_manager.cpp
similarity index 50%
copy from platform/consensus/ordering/poe/framework/consensus.h
copy to platform/consensus/ordering/poe/framework/performance_manager.cpp
index 21830d97..ef527a57 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/performance_manager.cpp
@@ -17,37 +17,25 @@
* under the License.
*/
-#pragma once
+#include "platform/consensus/ordering/poe/framework/performance_manager.h"
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/poe/algorithm/poe.h"
-#include "platform/networkstrate/consensus_manager.h"
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
namespace resdb {
namespace poe {
-class Consensus : public common::Consensus {
- public:
- Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> transaction_manager);
- virtual ~Consensus() = default;
-
- private:
- int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
- int ProcessNewTransaction(std::unique_ptr<Request> request) override;
- int CommitMsg(const google::protobuf::Message& msg) override;
- int CommitMsgInternal(const Transaction& txn);
-
- int Prepare(const Transaction& txn);
-
- protected:
- std::unique_ptr<PoE> poe_;
- Stats* global_stats_;
- int64_t start_;
- std::mutex mutex_;
- int send_num_[200];
-};
+using comm::CollectorResultCode;
+
+PoEPerformanceManager::PoEPerformanceManager(
+ const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier)
+ : PerformanceManager(config, replica_communicator, verifier) {
+ f_ = config_.GetMaxMaliciousReplicaNum();
+}
+
+int PoEPerformanceManager::NeedResponse() { return 2 * f_ + 1; }
} // namespace poe
} // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus.h
b/platform/consensus/ordering/poe/framework/performance_manager.h
similarity index 52%
copy from platform/consensus/ordering/poe/framework/consensus.h
copy to platform/consensus/ordering/poe/framework/performance_manager.h
index 21830d97..6987adb3 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/performance_manager.h
@@ -19,34 +19,24 @@
#pragma once
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/poe/algorithm/poe.h"
-#include "platform/networkstrate/consensus_manager.h"
+#include <thread>
+
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+#include "platform/consensus/ordering/poe/proto/proposal.pb.h"
namespace resdb {
namespace poe {
-class Consensus : public common::Consensus {
+class PoEPerformanceManager : public common::PerformanceManager {
public:
- Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> transaction_manager);
- virtual ~Consensus() = default;
+ PoEPerformanceManager(const ResDBConfig& config,
+ ReplicaCommunicator* replica_communicator,
+ SignatureVerifier* verifier);
+
+ int NeedResponse() override;
private:
- int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
- int ProcessNewTransaction(std::unique_ptr<Request> request) override;
- int CommitMsg(const google::protobuf::Message& msg) override;
- int CommitMsgInternal(const Transaction& txn);
-
- int Prepare(const Transaction& txn);
-
- protected:
- std::unique_ptr<PoE> poe_;
- Stats* global_stats_;
- int64_t start_;
- std::mutex mutex_;
- int send_num_[200];
+ int f_;
};
} // namespace poe