This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dfdc9b62 Poe fix response (#225)
dfdc9b62 is described below

commit dfdc9b6217c09e59000f66f6b3784818153fa1d1
Author: cjcchen <[email protected]>
AuthorDate: Wed Jan 21 13:25:48 2026 +0800

    Poe fix response (#225)
    
    * change reponse num for poe
    
    * change reponse num for poe
    
    * add files
    
    * add files
---
 .../common/framework/performance_manager.cpp       |  6 +++-
 .../common/framework/performance_manager.h         |  1 +
 .../ordering/pbft/checkpoint_manager_test.cpp      | 12 ++++---
 .../consensus/ordering/pbft/commitment_test.cpp    |  3 +-
 platform/consensus/ordering/pbft/query_test.cpp    |  3 +-
 platform/consensus/ordering/poe/framework/BUILD    | 11 ++++++
 .../consensus/ordering/poe/framework/consensus.cpp | 14 ++++++++
 .../consensus/ordering/poe/framework/consensus.h   |  3 ++
 .../{consensus.h => performance_manager.cpp}       | 40 ++++++++--------------
 .../{consensus.h => performance_manager.h}         | 32 ++++++-----------
 10 files changed, 71 insertions(+), 54 deletions(-)

diff --git 
a/platform/consensus/ordering/common/framework/performance_manager.cpp 
b/platform/consensus/ordering/common/framework/performance_manager.cpp
index c07088f1..ebaf1d6a 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ b/platform/consensus/ordering/common/framework/performance_manager.cpp
@@ -69,6 +69,10 @@ PerformanceManager::~PerformanceManager() {
 
 int PerformanceManager::GetPrimary() { return primary_; }
 
+int PerformanceManager::NeedResponse() {
+  return config_.GetMinClientReceiveNum();  // f+1;
+}
+
 std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
   std::unique_ptr<Request> request = std::make_unique<Request>();
   request->set_data(data_func_());
@@ -154,7 +158,7 @@ CollectorResultCode PerformanceManager::AddResponseMsg(
       return CollectorResultCode::OK;
     }
     response_[idx][seq]++;
-    if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
+    if (response_[idx][seq] >= NeedResponse()) {
       response_[idx].erase(response_[idx].find(seq));
       done = true;
     }
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h 
b/platform/consensus/ordering/common/framework/performance_manager.h
index a34f0573..c2dce102 100644
--- a/platform/consensus/ordering/common/framework/performance_manager.h
+++ b/platform/consensus/ordering/common/framework/performance_manager.h
@@ -46,6 +46,7 @@ class PerformanceManager {
 
  protected:
   virtual void SendMessage(const Request& request);
+  virtual int NeedResponse();
 
  private:
   // Add response messages which will be sent back to the caller
diff --git a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp 
b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
index 0f7ebc88..e13746c4 100644
--- a/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
+++ b/platform/consensus/ordering/pbft/checkpoint_manager_test.cpp
@@ -108,7 +108,8 @@ class CheckPointManagerTest : public Test {
 TEST_F(CheckPointManagerTest, SendCheckPoint) {
   config_.SetViewchangeCommitTimeout(100);
   SystemInfo sys_info;
-  CheckPointManager manager(config_, &replica_communicator_, nullptr, 
&sys_info);
+  CheckPointManager manager(config_, &replica_communicator_, nullptr,
+                            &sys_info);
 
   for (int i = 1; i <= 5; ++i) {
     std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -135,7 +136,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointOnce) {
   }));
 
   SystemInfo sys_info;
-  CheckPointManager manager(config_, &replica_communicator_, nullptr, 
&sys_info);
+  CheckPointManager manager(config_, &replica_communicator_, nullptr,
+                            &sys_info);
   for (int i = 1; i <= 5; ++i) {
     std::unique_ptr<Request> request = std::make_unique<Request>();
     request->set_seq(i);
@@ -163,7 +165,8 @@ TEST_F(CheckPointManagerTest, SendCheckPointTwo) {
   }));
 
   SystemInfo sys_info;
-  CheckPointManager manager(config_, &replica_communicator_, nullptr, 
&sys_info);
+  CheckPointManager manager(config_, &replica_communicator_, nullptr,
+                            &sys_info);
   std::unique_ptr<Request> request = std::make_unique<Request>();
   for (int i = 1; i <= 5; ++i) {
     std::unique_ptr<Request> request = std::make_unique<Request>();
@@ -261,7 +264,8 @@ TEST_F(CheckPointManagerTest, Votes) {
   std::future<bool> propose_done_future = propose_done.get_future();
 
   SystemInfo sys_info;
-  CheckPointManager manager(config_, &replica_communicator_, &mock_verifier, 
&sys_info);
+  CheckPointManager manager(config_, &replica_communicator_, &mock_verifier,
+                            &sys_info);
   EXPECT_CALL(replica_communicator_, BroadCast)
       .WillRepeatedly(Invoke([&](const google::protobuf::Message& message) {
         for (int i = 1; i <= 3; ++i) {
diff --git a/platform/consensus/ordering/pbft/commitment_test.cpp 
b/platform/consensus/ordering/pbft/commitment_test.cpp
index 156362e6..8ca54fe6 100644
--- a/platform/consensus/ordering/pbft/commitment_test.cpp
+++ b/platform/consensus/ordering/pbft/commitment_test.cpp
@@ -58,7 +58,8 @@ class CommitmentTest : public Test {
         global_stats_(Stats::GetGlobalStats(1)),
         config_(GenerateConfig()),
         system_info_(config_),
-        checkpoint_manager_(config_, &replica_communicator_, &verifier_, 
&system_info_),
+        checkpoint_manager_(config_, &replica_communicator_, &verifier_,
+                            &system_info_),
         message_manager_(std::make_unique<MessageManager>(
             config_, nullptr, &checkpoint_manager_, &system_info_)),
         commitment_(
diff --git a/platform/consensus/ordering/pbft/query_test.cpp 
b/platform/consensus/ordering/pbft/query_test.cpp
index b8639c6f..09056b87 100644
--- a/platform/consensus/ordering/pbft/query_test.cpp
+++ b/platform/consensus/ordering/pbft/query_test.cpp
@@ -59,7 +59,8 @@ class QueryTest : public Test {
       : global_stats_(Stats::GetGlobalStats(1)),
         config_(GenerateConfig()),
         system_info_(config_),
-        checkpoint_manager_(config_, &replica_communicator_, nullptr, 
&system_info_),
+        checkpoint_manager_(config_, &replica_communicator_, nullptr,
+                            &system_info_),
         message_manager_(config_, nullptr, &checkpoint_manager_, 
&system_info_),
         recovery_(config_, &checkpoint_manager_, &system_info_, nullptr),
         query_(config_, &recovery_),
diff --git a/platform/consensus/ordering/poe/framework/BUILD 
b/platform/consensus/ordering/poe/framework/BUILD
index 3d697bc4..4a0bb12b 100644
--- a/platform/consensus/ordering/poe/framework/BUILD
+++ b/platform/consensus/ordering/poe/framework/BUILD
@@ -26,8 +26,19 @@ cc_library(
         "//visibility:public",
     ],
     deps = [
+        ":performance_manager",
         "//common/utils",
         "//platform/consensus/ordering/common/framework:consensus",
         "//platform/consensus/ordering/poe/algorithm:poe",
     ],
 )
+
+cc_library(
+    name = "performance_manager",
+    srcs = ["performance_manager.cpp"],
+    hdrs = ["performance_manager.h"],
+    deps = [
+        "//platform/consensus/ordering/common/framework:performance_manager",
+        "//platform/consensus/ordering/poe/proto:proposal_cc_proto",
+    ],
+)
diff --git a/platform/consensus/ordering/poe/framework/consensus.cpp 
b/platform/consensus/ordering/poe/framework/consensus.cpp
index ef7c521e..955c34ea 100644
--- a/platform/consensus/ordering/poe/framework/consensus.cpp
+++ b/platform/consensus/ordering/poe/framework/consensus.cpp
@@ -27,12 +27,26 @@
 namespace resdb {
 namespace poe {
 
+std::unique_ptr<PoEPerformanceManager> Consensus::GetPerformanceManager() {
+  return config_.IsPerformanceRunning()
+             ? std::make_unique<PoEPerformanceManager>(
+                   config_, GetBroadCastClient(), GetSignatureVerifier())
+             : nullptr;
+}
+
 Consensus::Consensus(const ResDBConfig& config,
                      std::unique_ptr<TransactionManager> executor)
     : common::Consensus(config, std::move(executor)) {
   int total_replicas = config_.GetReplicaNum();
   int f = (total_replicas - 1) / 3;
 
+  if (config_.GetPublicKeyCertificateInfo()
+          .public_key()
+          .public_key_info()
+          .type() == CertificateKeyInfo::CLIENT) {
+    SetPerformanceManager(GetPerformanceManager());
+  }
+
   Init();
 
   start_ = 0;
diff --git a/platform/consensus/ordering/poe/framework/consensus.h 
b/platform/consensus/ordering/poe/framework/consensus.h
index 21830d97..0a857ada 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/consensus.h
@@ -22,6 +22,7 @@
 #include "executor/common/transaction_manager.h"
 #include "platform/consensus/ordering/common/framework/consensus.h"
 #include "platform/consensus/ordering/poe/algorithm/poe.h"
+#include "platform/consensus/ordering/poe/framework/performance_manager.h"
 #include "platform/networkstrate/consensus_manager.h"
 
 namespace resdb {
@@ -41,6 +42,8 @@ class Consensus : public common::Consensus {
 
   int Prepare(const Transaction& txn);
 
+  std::unique_ptr<PoEPerformanceManager> GetPerformanceManager();
+
  protected:
   std::unique_ptr<PoE> poe_;
   Stats* global_stats_;
diff --git a/platform/consensus/ordering/poe/framework/consensus.h 
b/platform/consensus/ordering/poe/framework/performance_manager.cpp
similarity index 50%
copy from platform/consensus/ordering/poe/framework/consensus.h
copy to platform/consensus/ordering/poe/framework/performance_manager.cpp
index 21830d97..ef527a57 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/performance_manager.cpp
@@ -17,37 +17,25 @@
  * under the License.
  */
 
-#pragma once
+#include "platform/consensus/ordering/poe/framework/performance_manager.h"
 
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/poe/algorithm/poe.h"
-#include "platform/networkstrate/consensus_manager.h"
+#include <glog/logging.h>
+
+#include "common/utils/utils.h"
 
 namespace resdb {
 namespace poe {
 
-class Consensus : public common::Consensus {
- public:
-  Consensus(const ResDBConfig& config,
-            std::unique_ptr<TransactionManager> transaction_manager);
-  virtual ~Consensus() = default;
-
- private:
-  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
-  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
-  int CommitMsg(const google::protobuf::Message& msg) override;
-  int CommitMsgInternal(const Transaction& txn);
-
-  int Prepare(const Transaction& txn);
-
- protected:
-  std::unique_ptr<PoE> poe_;
-  Stats* global_stats_;
-  int64_t start_;
-  std::mutex mutex_;
-  int send_num_[200];
-};
+using comm::CollectorResultCode;
+
+PoEPerformanceManager::PoEPerformanceManager(
+    const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
+    SignatureVerifier* verifier)
+    : PerformanceManager(config, replica_communicator, verifier) {
+  f_ = config_.GetMaxMaliciousReplicaNum();
+}
+
+int PoEPerformanceManager::NeedResponse() { return 2 * f_ + 1; }
 
 }  // namespace poe
 }  // namespace resdb
diff --git a/platform/consensus/ordering/poe/framework/consensus.h 
b/platform/consensus/ordering/poe/framework/performance_manager.h
similarity index 52%
copy from platform/consensus/ordering/poe/framework/consensus.h
copy to platform/consensus/ordering/poe/framework/performance_manager.h
index 21830d97..6987adb3 100644
--- a/platform/consensus/ordering/poe/framework/consensus.h
+++ b/platform/consensus/ordering/poe/framework/performance_manager.h
@@ -19,34 +19,24 @@
 
 #pragma once
 
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/poe/algorithm/poe.h"
-#include "platform/networkstrate/consensus_manager.h"
+#include <thread>
+
+#include "platform/consensus/ordering/common/framework/performance_manager.h"
+#include "platform/consensus/ordering/poe/proto/proposal.pb.h"
 
 namespace resdb {
 namespace poe {
 
-class Consensus : public common::Consensus {
+class PoEPerformanceManager : public common::PerformanceManager {
  public:
-  Consensus(const ResDBConfig& config,
-            std::unique_ptr<TransactionManager> transaction_manager);
-  virtual ~Consensus() = default;
+  PoEPerformanceManager(const ResDBConfig& config,
+                        ReplicaCommunicator* replica_communicator,
+                        SignatureVerifier* verifier);
+
+  int NeedResponse() override;
 
  private:
-  int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
-  int ProcessNewTransaction(std::unique_ptr<Request> request) override;
-  int CommitMsg(const google::protobuf::Message& msg) override;
-  int CommitMsgInternal(const Transaction& txn);
-
-  int Prepare(const Transaction& txn);
-
- protected:
-  std::unique_ptr<PoE> poe_;
-  Stats* global_stats_;
-  int64_t start_;
-  std::mutex mutex_;
-  int send_num_[200];
+  int f_;
 };
 
 }  // namespace poe

Reply via email to