This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/smart_contract_merge by this 
push:
     new 01eb08b8 merge master
01eb08b8 is described below

commit 01eb08b838419c6a73563ff38b4e196eaace0ac8
Author: junchao <junchao@localhost>
AuthorDate: Thu Feb 13 12:42:58 2025 -0700

    merge master
---
 .../consensus/ordering/poe/proto/algorithm/BUILD   |  29 ---
 .../ordering/poe/proto/algorithm/protocol_base.cpp |  65 -----
 .../ordering/poe/proto/algorithm/protocol_base.h   |  87 -------
 .../consensus/ordering/poe/proto/framework/BUILD   |  66 -----
 .../ordering/poe/proto/framework/consensus.cpp     | 168 -------------
 .../ordering/poe/proto/framework/consensus.h       |  78 ------
 .../poe/proto/framework/performance_manager.cpp    | 276 ---------------------
 .../poe/proto/framework/performance_manager.h      |  97 --------
 .../poe/proto/framework/response_manager.cpp       | 236 ------------------
 .../poe/proto/framework/response_manager.h         |  79 ------
 .../poe/proto/framework/transaction_utils.cpp      |  43 ----
 .../poe/proto/framework/transaction_utils.h        |  39 ---
 12 files changed, 1263 deletions(-)

diff --git a/platform/consensus/ordering/poe/proto/algorithm/BUILD 
b/platform/consensus/ordering/poe/proto/algorithm/BUILD
deleted file mode 100644
index 0d7c5d19..00000000
--- a/platform/consensus/ordering/poe/proto/algorithm/BUILD
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
-
-cc_library(
-    name = "protocol_base",
-    srcs = ["protocol_base.cpp"],
-    hdrs = ["protocol_base.h"],
-    deps = [
-        "//common:comm",
-        "//common/crypto:signature_verifier",
-    ],
-)
diff --git a/platform/consensus/ordering/poe/proto/algorithm/protocol_base.cpp 
b/platform/consensus/ordering/poe/proto/algorithm/protocol_base.cpp
deleted file mode 100644
index cababed4..00000000
--- a/platform/consensus/ordering/poe/proto/algorithm/protocol_base.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-
-#include <glog/logging.h>
-
-namespace resdb {
-namespace common {
-
-ProtocolBase::ProtocolBase(int id, int f, int total_num,
-                           SingleCallFuncType single_call,
-                           BroadcastCallFuncType broadcast_call,
-                           CommitFuncType commit)
-    : id_(id),
-      f_(f),
-      total_num_(total_num),
-      single_call_(single_call),
-      broadcast_call_(broadcast_call),
-      commit_(commit) {
-  stop_ = false;
-}
-
-ProtocolBase::ProtocolBase(int id, int f, int total_num)
-    : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr) {}
-
-ProtocolBase::~ProtocolBase() { Stop(); }
-
-void ProtocolBase::Stop() { stop_ = true; }
-
-bool ProtocolBase::IsStop() { return stop_; }
-
-int ProtocolBase::SendMessage(int msg_type,
-                              const google::protobuf::Message& msg,
-                              int node_id) {
-  return single_call_(msg_type, msg, node_id);
-}
-
-int ProtocolBase::Broadcast(int msg_type,
-                            const google::protobuf::Message& msg) {
-  return broadcast_call_(msg_type, msg);
-}
-
-int ProtocolBase::Commit(const google::protobuf::Message& msg) {
-  return commit_(msg);
-}
-
-}  // namespace common
-}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/algorithm/protocol_base.h 
b/platform/consensus/ordering/poe/proto/algorithm/protocol_base.h
deleted file mode 100644
index f8e47052..00000000
--- a/platform/consensus/ordering/poe/proto/algorithm/protocol_base.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <google/protobuf/message.h>
-
-#include <functional>
-
-#include "common/crypto/signature_verifier.h"
-
-namespace resdb {
-namespace common {
-
-class ProtocolBase {
- public:
-  typedef std::function<int(int, const google::protobuf::Message& msg, int)>
-      SingleCallFuncType;
-  typedef std::function<int(int, const google::protobuf::Message& msg)>
-      BroadcastCallFuncType;
-  typedef std::function<int(const google::protobuf::Message& msg)>
-      CommitFuncType;
-
-  ProtocolBase(int id, int f, int total_num, SingleCallFuncType single_call,
-               BroadcastCallFuncType broadcast_call, CommitFuncType commit);
-
-  ProtocolBase(int id, int f, int total_num);
-
-  virtual ~ProtocolBase();
-
-  void Stop();
-
-  inline void SetSingleCallFunc(SingleCallFuncType single_call) {
-    single_call_ = single_call;
-  }
-
-  inline void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
-    broadcast_call_ = broadcast_call;
-  }
-
-  inline void SetCommitFunc(CommitFuncType commit_func) {
-    commit_ = commit_func;
-  }
-
-  inline void SetSignatureVerifier(SignatureVerifier* verifier) {
-    verifier_ = verifier;
-  }
-
- protected:
-  int SendMessage(int msg_type, const google::protobuf::Message& msg,
-                  int node_id);
-  int Broadcast(int msg_type, const google::protobuf::Message& msg);
-  int Commit(const google::protobuf::Message& msg);
-
-  bool IsStop();
-
- protected:
-  int id_;
-  int f_;
-  int total_num_;
-  std::function<int(int, const google::protobuf::Message& msg, int)>
-      single_call_;
-  std::function<int(int, const google::protobuf::Message& msg)> 
broadcast_call_;
-  std::function<int(const google::protobuf::Message& msg)> commit_;
-  std::atomic<bool> stop_;
-
-  SignatureVerifier* verifier_;
-};
-
-}  // namespace common
-}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/framework/BUILD 
b/platform/consensus/ordering/poe/proto/framework/BUILD
deleted file mode 100644
index c56c9260..00000000
--- a/platform/consensus/ordering/poe/proto/framework/BUILD
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
-
-cc_library(
-    name = "consensus",
-    srcs = ["consensus.cpp"],
-    hdrs = ["consensus.h"],
-    deps = [
-        ":performance_manager",
-        ":response_manager",
-        "//common/utils",
-        "//executor/common:transaction_manager",
-        "//platform/consensus/execution:transaction_executor",
-        "//platform/consensus/ordering/common/algorithm:protocol_base",
-        "//platform/networkstrate:consensus_manager",
-    ],
-)
-
-cc_library(
-    name = "performance_manager",
-    srcs = ["performance_manager.cpp"],
-    hdrs = ["performance_manager.h"],
-    deps = [
-        ":transaction_utils",
-        "//platform/networkstrate:replica_communicator",
-        "//platform/networkstrate:server_comm",
-    ],
-)
-
-cc_library(
-    name = "response_manager",
-    srcs = ["response_manager.cpp"],
-    hdrs = ["response_manager.h"],
-    deps = [
-        ":transaction_utils",
-        "//platform/networkstrate:replica_communicator",
-        "//platform/networkstrate:server_comm",
-    ],
-)
-
-cc_library(
-    name = "transaction_utils",
-    srcs = ["transaction_utils.cpp"],
-    hdrs = ["transaction_utils.h"],
-    visibility = ["//visibility:public"],
-    deps = [
-        "//platform/proto:resdb_cc_proto",
-    ],
-)
diff --git a/platform/consensus/ordering/poe/proto/framework/consensus.cpp 
b/platform/consensus/ordering/poe/proto/framework/consensus.cpp
deleted file mode 100644
index 93e00cc8..00000000
--- a/platform/consensus/ordering/poe/proto/framework/consensus.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-Consensus::Consensus(const ResDBConfig& config,
-                     std::unique_ptr<TransactionManager> executor)
-    : ConsensusManager(config),
-      replica_communicator_(GetBroadCastClient()),
-      transaction_executor_(std::make_unique<TransactionExecutor>(
-          config,
-          [&](std::unique_ptr<Request> request,
-              std::unique_ptr<BatchUserResponse> resp_msg) {
-            ResponseMsg(*resp_msg);
-          },
-          nullptr, std::move(executor))) {
-  LOG(INFO) << "is running is performance mode:"
-            << config_.IsPerformanceRunning();
-  is_stop_ = false;
-  global_stats_ = Stats::GetGlobalStats();
-}
-
-void Consensus::Init() {
-  if (performance_manager_ == nullptr) {
-    performance_manager_ =
-        config_.IsPerformanceRunning()
-            ? std::make_unique<PerformanceManager>(
-                  config_, GetBroadCastClient(), GetSignatureVerifier())
-            : nullptr;
-  }
-
-  if (response_manager_ == nullptr) {
-    response_manager_ =
-        !config_.IsPerformanceRunning()
-            ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
-                                                GetSignatureVerifier())
-            : nullptr;
-  }
-}
-
-void Consensus::InitProtocol(ProtocolBase* protocol) {
-  protocol->SetSingleCallFunc(
-      [&](int type, const google::protobuf::Message& msg, int node_id) {
-        return SendMsg(type, msg, node_id);
-      });
-
-  protocol->SetBroadcastCallFunc(
-      [&](int type, const google::protobuf::Message& msg) {
-        return Broadcast(type, msg);
-      });
-
-  protocol->SetCommitFunc(
-      [&](const google::protobuf::Message& msg) { return CommitMsg(msg); });
-}
-
-Consensus::~Consensus() { is_stop_ = true; }
-
-void Consensus::SetPerformanceManager(
-    std::unique_ptr<PerformanceManager> performance_manager) {
-  performance_manager_ = std::move(performance_manager);
-}
-
-bool Consensus::IsStop() { return is_stop_; }
-
-void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
-  performance_manager_->SetDataFunc(func);
-}
-
-void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) {
-  replica_communicator_ = replica_communicator;
-}
-
-int Consensus::Broadcast(int type, const google::protobuf::Message& msg) {
-  Request request;
-  msg.SerializeToString(request.mutable_data());
-  request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
-  request.set_user_type(type);
-  request.set_sender_id(config_.GetSelfInfo().id());
-
-  replica_communicator_->BroadCast(request);
-  return 0;
-}
-
-int Consensus::SendMsg(int type, const google::protobuf::Message& msg,
-                       int node_id) {
-  Request request;
-  msg.SerializeToString(request.mutable_data());
-  request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
-  request.set_user_type(type);
-  request.set_sender_id(config_.GetSelfInfo().id());
-  replica_communicator_->SendMessage(request, node_id);
-  return 0;
-}
-
-std::vector<ReplicaInfo> Consensus::GetReplicas() {
-  return config_.GetReplicaInfos();
-}
-
-int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; }
-
-// The implementation of PBFT.
-int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
-                               std::unique_ptr<Request> request) {
-  switch (request->type()) {
-    case Request::TYPE_CLIENT_REQUEST:
-      if (config_.IsPerformanceRunning()) {
-        return performance_manager_->StartEval();
-      }
-    case Request::TYPE_RESPONSE:
-      if (config_.IsPerformanceRunning()) {
-        return performance_manager_->ProcessResponseMsg(std::move(context),
-                                                        std::move(request));
-      }
-    case Request::TYPE_NEW_TXNS: {
-      return ProcessNewTransaction(std::move(request));
-    }
-    case Request::TYPE_CUSTOM_CONSENSUS: {
-      return ProcessCustomConsensus(std::move(request));
-    }
-  }
-  return 0;
-}
-
-int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
-  return 0;
-}
-
-int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
-  return 0;
-}
-
-int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) {
-  Request request;
-  request.set_seq(batch_resp.seq());
-  request.set_type(Request::TYPE_RESPONSE);
-  request.set_sender_id(config_.GetSelfInfo().id());
-  request.set_proxy_id(batch_resp.proxy_id());
-  batch_resp.SerializeToString(request.mutable_data());
-  replica_communicator_->SendMessage(request, request.proxy_id());
-  return 0;
-}
-
-}  // namespace common
-}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/framework/consensus.h 
b/platform/consensus/ordering/poe/proto/framework/consensus.h
deleted file mode 100644
index 2f2884b8..00000000
--- a/platform/consensus/ordering/poe/proto/framework/consensus.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/execution/transaction_executor.h"
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-#include "platform/consensus/ordering/common/framework/performance_manager.h"
-#include "platform/consensus/ordering/common/framework/response_manager.h"
-#include "platform/networkstrate/consensus_manager.h"
-
-namespace resdb {
-namespace common {
-
-class Consensus : public ConsensusManager {
- public:
-  Consensus(const ResDBConfig& config,
-            std::unique_ptr<TransactionManager> transaction_manager);
-  virtual ~Consensus();
-
-  int ConsensusCommit(std::unique_ptr<Context> context,
-                      std::unique_ptr<Request> request) override;
-  std::vector<ReplicaInfo> GetReplicas() override;
-
-  void SetupPerformanceDataFunc(std::function<std::string()> func);
-
-  void SetCommunicator(ReplicaCommunicator* replica_communicator);
-
-  void InitProtocol(ProtocolBase* protocol);
-
- protected:
-  virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
-  virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
-  virtual int CommitMsg(const google::protobuf::Message& msg);
-
- protected:
-  int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
-  int Broadcast(int type, const google::protobuf::Message& msg);
-  int ResponseMsg(const BatchUserResponse& batch_resp);
-  void AsyncSend();
-  bool IsStop();
-
- protected:
-  void Init();
-  void SetPerformanceManager(
-      std::unique_ptr<PerformanceManager> performance_manger);
-
- protected:
-  ReplicaCommunicator* replica_communicator_;
-  std::unique_ptr<PerformanceManager> performance_manager_;
-  std::unique_ptr<ResponseManager> response_manager_;
-  std::unique_ptr<TransactionExecutor> transaction_executor_;
-  Stats* global_stats_;
-
-  LockFreeQueue<BatchUserResponse> resp_queue_;
-  std::vector<std::thread> send_thread_;
-  bool is_stop_;
-};
-
-}  // namespace common
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/poe/proto/framework/performance_manager.cpp 
b/platform/consensus/ordering/poe/proto/framework/performance_manager.cpp
deleted file mode 100644
index c07088f1..00000000
--- a/platform/consensus/ordering/poe/proto/framework/performance_manager.cpp
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/performance_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-using comm::CollectorResultCode;
-
-PerformanceManager::PerformanceManager(
-    const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
-    SignatureVerifier* verifier)
-    : config_(config),
-      replica_communicator_(replica_communicator),
-      batch_queue_("user request"),
-      verifier_(verifier) {
-  stop_ = false;
-  eval_started_ = false;
-  eval_ready_future_ = eval_ready_promise_.get_future();
-  if (config_.GetPublicKeyCertificateInfo()
-          .public_key()
-          .public_key_info()
-          .type() == CertificateKeyInfo::CLIENT) {
-    for (int i = 0; i < 1; ++i) {
-      user_req_thread_[i] =
-          std::thread(&PerformanceManager::BatchProposeMsg, this);
-    }
-  }
-  global_stats_ = Stats::GetGlobalStats();
-  send_num_ = 0;
-  total_num_ = 0;
-  replica_num_ = config_.GetReplicaNum();
-  id_ = config_.GetSelfInfo().id();
-  primary_ = id_ % replica_num_;
-  if (primary_ == 0) primary_ = replica_num_;
-  local_id_ = 1;
-  sum_ = 0;
-}
-
-PerformanceManager::~PerformanceManager() {
-  stop_ = true;
-  for (int i = 0; i < 16; ++i) {
-    if (user_req_thread_[i].joinable()) {
-      user_req_thread_[i].join();
-    }
-  }
-}
-
-int PerformanceManager::GetPrimary() { return primary_; }
-
-std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
-  std::unique_ptr<Request> request = std::make_unique<Request>();
-  request->set_data(data_func_());
-  return request;
-}
-
-void PerformanceManager::SetDataFunc(std::function<std::string()> func) {
-  data_func_ = std::move(func);
-}
-
-int PerformanceManager::StartEval() {
-  if (eval_started_) {
-    return 0;
-  }
-  eval_started_ = true;
-  for (int i = 0; i < 100000000; ++i) {
-    std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
-    queue_item->context = nullptr;
-    queue_item->user_request = GenerateUserRequest();
-    batch_queue_.Push(std::move(queue_item));
-    if (i == 2000000) {
-      eval_ready_promise_.set_value(true);
-    }
-  }
-  LOG(WARNING) << "start eval done";
-  return 0;
-}
-
-// =================== response ========================
-// handle the response message. If receive f+1 commit messages, send back to 
the
-// user.
-int PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
-                                           std::unique_ptr<Request> request) {
-  std::unique_ptr<Request> response;
-  // Add the response message, and use the call back to collect the received
-  // messages.
-  // The callback will be triggered if it received f+1 messages.
-  if (request->ret() == -2) {
-    // LOG(INFO) << "get response fail:" << request->ret();
-    send_num_--;
-    return 0;
-  }
-
-  // LOG(INFO) << "get response:" << request->seq() << "
-  // sender:"<<request->sender_id();
-  std::unique_ptr<BatchUserResponse> batch_response = nullptr;
-  CollectorResultCode ret = AddResponseMsg(
-      std::move(request), [&](std::unique_ptr<BatchUserResponse> request) {
-        batch_response = std::move(request);
-        return;
-      });
-
-  if (ret == CollectorResultCode::STATE_CHANGED) {
-    assert(batch_response);
-    SendResponseToClient(*batch_response);
-  }
-  return ret == CollectorResultCode::INVALID ? -2 : 0;
-}
-
-CollectorResultCode PerformanceManager::AddResponseMsg(
-    std::unique_ptr<Request> request,
-    std::function<void(std::unique_ptr<BatchUserResponse>)>
-        response_call_back) {
-  if (request == nullptr) {
-    return CollectorResultCode::INVALID;
-  }
-
-  std::unique_ptr<BatchUserResponse> batch_response =
-      std::make_unique<BatchUserResponse>();
-  if (!batch_response->ParseFromString(request->data())) {
-    LOG(ERROR) << "parse response fail:" << request->data().size()
-               << " seq:" << request->seq();
-    return CollectorResultCode::INVALID;
-  }
-
-  uint64_t seq = batch_response->local_id();
-
-  bool done = false;
-  {
-    int idx = seq % response_set_size_;
-    std::unique_lock<std::mutex> lk(response_lock_[idx]);
-    if (response_[idx].find(seq) == response_[idx].end()) {
-      return CollectorResultCode::OK;
-    }
-    response_[idx][seq]++;
-    if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
-      response_[idx].erase(response_[idx].find(seq));
-      done = true;
-    }
-  }
-  if (done) {
-    response_call_back(std::move(batch_response));
-    return CollectorResultCode::STATE_CHANGED;
-  }
-  return CollectorResultCode::OK;
-}
-
-void PerformanceManager::SendResponseToClient(
-    const BatchUserResponse& batch_response) {
-  uint64_t create_time = batch_response.createtime();
-  if (create_time > 0) {
-    uint64_t run_time = GetCurrentTime() - create_time;
-    LOG(ERROR) << "receive current:" << GetCurrentTime()
-               << " create time:" << create_time << " run time:" << run_time
-               << " local id:" << batch_response.local_id();
-    global_stats_->AddLatency(run_time);
-  }
-  send_num_--;
-}
-
-// =================== request ========================
-int PerformanceManager::BatchProposeMsg() {
-  LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
-               << " batch num:" << config_.ClientBatchNum()
-               << " max txn:" << config_.GetMaxProcessTxn();
-  std::vector<std::unique_ptr<QueueItem>> batch_req;
-  eval_ready_future_.get();
-  bool start = false;
-  while (!stop_) {
-    if (send_num_ > config_.GetMaxProcessTxn()) {
-      usleep(100000);
-      continue;
-    }
-    if (batch_req.size() < config_.ClientBatchNum()) {
-      std::unique_ptr<QueueItem> item =
-          batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
-      if (item == nullptr) {
-        if (start) {
-          LOG(ERROR) << "no data";
-        }
-        continue;
-      }
-      batch_req.push_back(std::move(item));
-      if (batch_req.size() < config_.ClientBatchNum()) {
-        continue;
-      }
-    }
-    start = true;
-    DoBatch(batch_req);
-    batch_req.clear();
-  }
-  return 0;
-}
-
-int PerformanceManager::DoBatch(
-    const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
-  auto new_request = comm::NewRequest(Request::TYPE_NEW_TXNS, Request(),
-                                      config_.GetSelfInfo().id());
-  if (new_request == nullptr) {
-    return -2;
-  }
-
-  BatchUserRequest batch_request;
-  for (size_t i = 0; i < batch_req.size(); ++i) {
-    BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
-    *req->mutable_request() = *batch_req[i]->user_request.get();
-    req->set_id(i);
-  }
-
-  batch_request.set_local_id(local_id_++);
-
-  {
-    int idx = batch_request.local_id() % response_set_size_;
-    std::unique_lock<std::mutex> lk(response_lock_[idx]);
-    response_[idx][batch_request.local_id()]++;
-  }
-
-  batch_request.set_proxy_id(config_.GetSelfInfo().id());
-  batch_request.set_createtime(GetCurrentTime());
-  batch_request.SerializeToString(new_request->mutable_data());
-  if (verifier_) {
-    auto signature_or = verifier_->SignMessage(new_request->data());
-    if (!signature_or.ok()) {
-      LOG(ERROR) << "Sign message fail";
-      return -2;
-    }
-    *new_request->mutable_data_signature() = *signature_or;
-  }
-
-  new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
-  new_request->set_proxy_id(config_.GetSelfInfo().id());
-  new_request->set_user_seq(batch_request.local_id());
-
-  SendMessage(*new_request);
-
-  global_stats_->BroadCastMsg();
-  send_num_++;
-  sum_ += batch_req.size();
-  if (total_num_++ == 1000000) {
-    stop_ = true;
-    LOG(WARNING) << "total num is done:" << total_num_;
-  }
-  if (total_num_ % 1000 == 0) {
-    LOG(WARNING) << "total num is :" << total_num_;
-  }
-  global_stats_->IncClientCall();
-  return 0;
-}
-
-void PerformanceManager::SendMessage(const Request& request) {
-  replica_communicator_->SendMessage(request, GetPrimary());
-}
-
-}  // namespace common
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/poe/proto/framework/performance_manager.h 
b/platform/consensus/ordering/poe/proto/framework/performance_manager.h
deleted file mode 100644
index a34f0573..00000000
--- a/platform/consensus/ordering/poe/proto/framework/performance_manager.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <future>
-
-#include "platform/config/resdb_config.h"
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-#include "platform/networkstrate/replica_communicator.h"
-#include "platform/networkstrate/server_comm.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace common {
-
-class PerformanceManager {
- public:
-  PerformanceManager(const ResDBConfig& config,
-                     ReplicaCommunicator* replica_communicator,
-                     SignatureVerifier* verifier);
-
-  virtual ~PerformanceManager();
-
-  int StartEval();
-
-  int ProcessResponseMsg(std::unique_ptr<Context> context,
-                         std::unique_ptr<Request> request);
-  void SetDataFunc(std::function<std::string()> func);
-
- protected:
-  virtual void SendMessage(const Request& request);
-
- private:
-  // Add response messages which will be sent back to the caller
-  // if there are f+1 same messages.
-  comm::CollectorResultCode AddResponseMsg(
-      std::unique_ptr<Request> request,
-      std::function<void(std::unique_ptr<BatchUserResponse>)> call_back);
-  void SendResponseToClient(const BatchUserResponse& batch_response);
-
-  struct QueueItem {
-    std::unique_ptr<Context> context;
-    std::unique_ptr<Request> user_request;
-  };
-  int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
-  int BatchProposeMsg();
-  int GetPrimary();
-  std::unique_ptr<Request> GenerateUserRequest();
-
- protected:
-  ResDBConfig config_;
-  ReplicaCommunicator* replica_communicator_;
-
- private:
-  LockFreeQueue<QueueItem> batch_queue_;
-  std::thread user_req_thread_[16];
-  std::atomic<bool> stop_;
-  Stats* global_stats_;
-  std::atomic<int> send_num_;
-  std::mutex mutex_;
-  std::atomic<int> total_num_;
-  SignatureVerifier* verifier_;
-  SignatureInfo sig_;
-  std::function<std::string()> data_func_;
-  std::future<bool> eval_ready_future_;
-  std::promise<bool> eval_ready_promise_;
-  std::atomic<bool> eval_started_;
-  std::atomic<int> fail_num_;
-  static const int response_set_size_ = 6000000;
-  std::map<int64_t, int> response_[response_set_size_];
-  std::mutex response_lock_[response_set_size_];
-  int replica_num_;
-  int id_;
-  int primary_;
-  std::atomic<int> local_id_;
-  std::atomic<int> sum_;
-};
-
-}  // namespace common
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/poe/proto/framework/response_manager.cpp 
b/platform/consensus/ordering/poe/proto/framework/response_manager.cpp
deleted file mode 100644
index 18eb065a..00000000
--- a/platform/consensus/ordering/poe/proto/framework/response_manager.cpp
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/response_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-using namespace resdb::comm;
-
-ResponseManager::ResponseManager(const ResDBConfig& config,
-                                 ReplicaCommunicator* replica_communicator,
-                                 SignatureVerifier* verifier)
-    : config_(config),
-      replica_communicator_(replica_communicator),
-      batch_queue_("user request"),
-      verifier_(verifier) {
-  stop_ = false;
-  local_id_ = 1;
-
-  if (config_.GetPublicKeyCertificateInfo()
-              .public_key()
-              .public_key_info()
-              .type() == CertificateKeyInfo::CLIENT ||
-      config_.IsTestMode()) {
-    user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
-  }
-  global_stats_ = Stats::GetGlobalStats();
-  send_num_ = 0;
-}
-
-ResponseManager::~ResponseManager() {
-  stop_ = true;
-  if (user_req_thread_.joinable()) {
-    user_req_thread_.join();
-  }
-}
-
-// use system info
-int ResponseManager::GetPrimary() { return 1; }
-
-int ResponseManager::NewUserRequest(std::unique_ptr<Context> context,
-                                    std::unique_ptr<Request> user_request) {
-  context->client = nullptr;
-
-  std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
-  queue_item->context = std::move(context);
-  queue_item->user_request = std::move(user_request);
-
-  batch_queue_.Push(std::move(queue_item));
-  return 0;
-}
-
-// =================== response ========================
-// handle the response message. If receive f+1 commit messages, send back to 
the
-// caller.
-int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
-                                        std::unique_ptr<Request> request) {
-  std::unique_ptr<Request> response;
-  // Add the response message, and use the call back to collect the received
-  // messages.
-  // The callback will be triggered if it received f+1 messages.
-  if (request->ret() == -2) {
-    LOG(ERROR) << "get response fail:" << request->ret();
-    send_num_--;
-    return 0;
-  }
-  CollectorResultCode ret =
-      AddResponseMsg(std::move(request), [&](const Request& request) {
-        response = std::make_unique<Request>(request);
-        return;
-      });
-
-  if (ret == CollectorResultCode::STATE_CHANGED) {
-    BatchUserResponse batch_response;
-    if (batch_response.ParseFromString(response->data())) {
-      SendResponseToClient(batch_response);
-    } else {
-      LOG(ERROR) << "parse response fail:";
-    }
-  }
-  return ret == CollectorResultCode::INVALID ? -2 : 0;
-}
-
-CollectorResultCode ResponseManager::AddResponseMsg(
-    std::unique_ptr<Request> request,
-    std::function<void(const Request&)> response_call_back) {
-  if (request == nullptr) {
-    return CollectorResultCode::INVALID;
-  }
-
-  int type = request->type();
-  uint64_t seq = request->seq();
-  bool done = false;
-  {
-    int idx = seq % response_set_size_;
-    std::unique_lock<std::mutex> lk(response_lock_[idx]);
-    if (response_[idx][seq] == -1) {
-      return CollectorResultCode::OK;
-    }
-    response_[idx][seq]++;
-    if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
-      response_[idx][seq] = -1;
-      done = true;
-    }
-  }
-  if (done) {
-    response_call_back(*request);
-    return CollectorResultCode::STATE_CHANGED;
-  }
-  return CollectorResultCode::OK;
-}
-
-void ResponseManager::SendResponseToClient(
-    const BatchUserResponse& batch_response) {
-  uint64_t create_time = batch_response.createtime();
-  uint64_t local_id = batch_response.local_id();
-  if (create_time > 0) {
-    uint64_t run_time = GetCurrentTime() - create_time;
-    global_stats_->AddLatency(run_time);
-  } else {
-    LOG(ERROR) << "seq:" << local_id << " no resp";
-  }
-  send_num_--;
-}
-
-// =================== request ========================
-int ResponseManager::BatchProposeMsg() {
-  LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
-            << " batch num:" << config_.ClientBatchNum();
-  std::vector<std::unique_ptr<QueueItem>> batch_req;
-  while (!stop_) {
-    if (send_num_ > config_.GetMaxProcessTxn()) {
-      LOG(ERROR) << "send num too high, wait:" << send_num_;
-      usleep(100);
-      continue;
-    }
-    if (batch_req.size() < config_.ClientBatchNum()) {
-      std::unique_ptr<QueueItem> item =
-          batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
-      if (item != nullptr) {
-        batch_req.push_back(std::move(item));
-        if (batch_req.size() < config_.ClientBatchNum()) {
-          continue;
-        }
-      }
-    }
-    if (batch_req.empty()) {
-      continue;
-    }
-    int ret = DoBatch(batch_req);
-    batch_req.clear();
-    if (ret != 0) {
-      Response response;
-      response.set_result(Response::ERROR);
-      for (size_t i = 0; i < batch_req.size(); ++i) {
-        if (batch_req[i]->context && batch_req[i]->context->client) {
-          int ret = batch_req[i]->context->client->SendRawMessage(response);
-          if (ret) {
-            LOG(ERROR) << "send resp" << response.DebugString()
-                       << " fail ret:" << ret;
-          }
-        }
-      }
-    }
-  }
-  return 0;
-}
-
-int ResponseManager::DoBatch(
-    const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
-  auto new_request =
-      NewRequest(Request::TYPE_NEW_TXNS, Request(), 
config_.GetSelfInfo().id());
-  if (new_request == nullptr) {
-    return -2;
-  }
-  std::vector<std::unique_ptr<Context>> context_list;
-
-  BatchUserRequest batch_request;
-  for (size_t i = 0; i < batch_req.size(); ++i) {
-    BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
-    *req->mutable_request() = *batch_req[i]->user_request.get();
-    *req->mutable_signature() = batch_req[i]->context->signature;
-    req->set_id(i);
-    context_list.push_back(std::move(batch_req[i]->context));
-  }
-
-  if (!config_.IsPerformanceRunning()) {
-    LOG(ERROR) << "add context list:" << new_request->seq()
-               << " list size:" << context_list.size();
-    batch_request.set_local_id(local_id_);
-  }
-  batch_request.set_createtime(GetCurrentTime());
-  std::string data;
-  batch_request.SerializeToString(&data);
-  if (verifier_) {
-    auto signature_or = verifier_->SignMessage(data);
-    if (!signature_or.ok()) {
-      LOG(ERROR) << "Sign message fail";
-      return -2;
-    }
-    *new_request->mutable_data_signature() = *signature_or;
-  }
-
-  batch_request.SerializeToString(new_request->mutable_data());
-  new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
-  new_request->set_proxy_id(config_.GetSelfInfo().id());
-  replica_communicator_->SendMessage(*new_request, GetPrimary());
-  send_num_++;
-  LOG(INFO) << "send msg to primary:" << GetPrimary()
-            << " batch size:" << batch_req.size();
-  return 0;
-}
-
-}  // namespace common
-}  // namespace resdb
diff --git a/platform/consensus/ordering/poe/proto/framework/response_manager.h 
b/platform/consensus/ordering/poe/proto/framework/response_manager.h
deleted file mode 100644
index fef43dd8..00000000
--- a/platform/consensus/ordering/poe/proto/framework/response_manager.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "platform/config/resdb_config.h"
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-#include "platform/networkstrate/replica_communicator.h"
-#include "platform/networkstrate/server_comm.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace common {
-
-class ResponseManager {
- public:
-  ResponseManager(const ResDBConfig& config,
-                  ReplicaCommunicator* replica_communicator,
-                  SignatureVerifier* verifier);
-
-  ~ResponseManager();
-
-  std::vector<std::unique_ptr<Context>> FetchContextList(uint64_t id);
-
-  int NewUserRequest(std::unique_ptr<Context> context,
-                     std::unique_ptr<Request> user_request);
-
-  int ProcessResponseMsg(std::unique_ptr<Context> context,
-                         std::unique_ptr<Request> request);
-
- private:
-  // Add response messages which will be sent back to the caller
-  // if there are f+1 same messages.
-  comm::CollectorResultCode AddResponseMsg(
-      std::unique_ptr<Request> request,
-      std::function<void(const Request&)> call_back);
-  void SendResponseToClient(const BatchUserResponse& batch_response);
-
-  struct QueueItem {
-    std::unique_ptr<Context> context;
-    std::unique_ptr<Request> user_request;
-  };
-  int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
-  int BatchProposeMsg();
-  int GetPrimary();
-
- private:
-  ResDBConfig config_;
-  ReplicaCommunicator* replica_communicator_;
-  LockFreeQueue<QueueItem> batch_queue_;
-  std::thread user_req_thread_;
-  std::atomic<bool> stop_;
-  uint64_t local_id_ = 0;
-  Stats* global_stats_;
-  std::atomic<int> send_num_;
-  SignatureVerifier* verifier_;
-  static const int response_set_size_ = 6000000;
-  std::map<int64_t, int> response_[response_set_size_];
-  std::mutex response_lock_[response_set_size_];
-};
-
-}  // namespace common
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/poe/proto/framework/transaction_utils.cpp 
b/platform/consensus/ordering/poe/proto/framework/transaction_utils.cpp
deleted file mode 100644
index 9daca020..00000000
--- a/platform/consensus/ordering/poe/proto/framework/transaction_utils.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-
-namespace resdb {
-namespace comm {
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
-                                    int sender_id) {
-  auto new_request = std::make_unique<Request>(request);
-  new_request->set_type(type);
-  new_request->set_sender_id(sender_id);
-  return new_request;
-}
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
-                                    int sender_id, int region_id) {
-  auto new_request = std::make_unique<Request>(request);
-  new_request->set_type(type);
-  new_request->set_sender_id(sender_id);
-  new_request->mutable_region_info()->set_region_id(region_id);
-  return new_request;
-}
-
-}  // namespace comm
-}  // namespace resdb
diff --git 
a/platform/consensus/ordering/poe/proto/framework/transaction_utils.h 
b/platform/consensus/ordering/poe/proto/framework/transaction_utils.h
deleted file mode 100644
index 57bb8d35..00000000
--- a/platform/consensus/ordering/poe/proto/framework/transaction_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-#include "platform/proto/replica_info.pb.h"
-#include "platform/proto/resdb.pb.h"
-
-namespace resdb {
-namespace comm {
-
-enum CollectorResultCode {
-  INVALID = -2,
-  OK = 0,
-  STATE_CHANGED = 1,
-};
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
-                                    int sender_id);
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
-                                    int sender_id, int region_info);
-}  // namespace comm
-}  // namespace resdb


Reply via email to