This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/smart_contract_merge by this
push:
new 23a3103a merge master
23a3103a is described below
commit 23a3103aecbdba8879f893357f5db7837ac7f803
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 14 03:23:23 2025 +0800
merge master
---
platform/consensus/ordering/common/BUILD | 29 ---
platform/consensus/ordering/common/algorithm/BUILD | 29 ---
.../ordering/common/algorithm/protocol_base.cpp | 65 -----
.../ordering/common/algorithm/protocol_base.h | 87 -------
platform/consensus/ordering/common/framework/BUILD | 66 -----
.../ordering/common/framework/consensus.cpp | 168 -------------
.../ordering/common/framework/consensus.h | 78 ------
.../common/framework/performance_manager.cpp | 276 ---------------------
.../common/framework/performance_manager.h | 97 --------
.../ordering/common/framework/response_manager.cpp | 236 ------------------
.../ordering/common/framework/response_manager.h | 79 ------
.../common/framework/transaction_utils.cpp | 43 ----
.../ordering/common/framework/transaction_utils.h | 39 ---
.../ordering/common/transaction_utils.cpp | 41 ---
.../consensus/ordering/common/transaction_utils.h | 33 ---
15 files changed, 1366 deletions(-)
diff --git a/platform/consensus/ordering/common/BUILD
b/platform/consensus/ordering/common/BUILD
deleted file mode 100644
index 0813e522..00000000
--- a/platform/consensus/ordering/common/BUILD
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-package(default_visibility = ["//platform:__subpackages__"])
-
-cc_library(
- name = "transaction_utils",
- srcs = ["transaction_utils.cpp"],
- hdrs = ["transaction_utils.h"],
- visibility = ["//visibility:public"],
- deps = [
- "//platform/proto:resdb_cc_proto",
- ],
-)
diff --git a/platform/consensus/ordering/common/algorithm/BUILD
b/platform/consensus/ordering/common/algorithm/BUILD
deleted file mode 100644
index 0d7c5d19..00000000
--- a/platform/consensus/ordering/common/algorithm/BUILD
+++ /dev/null
@@ -1,29 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
-
-cc_library(
- name = "protocol_base",
- srcs = ["protocol_base.cpp"],
- hdrs = ["protocol_base.h"],
- deps = [
- "//common:comm",
- "//common/crypto:signature_verifier",
- ],
-)
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
b/platform/consensus/ordering/common/algorithm/protocol_base.cpp
deleted file mode 100644
index cababed4..00000000
--- a/platform/consensus/ordering/common/algorithm/protocol_base.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-
-#include <glog/logging.h>
-
-namespace resdb {
-namespace common {
-
-ProtocolBase::ProtocolBase(int id, int f, int total_num,
- SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call,
- CommitFuncType commit)
- : id_(id),
- f_(f),
- total_num_(total_num),
- single_call_(single_call),
- broadcast_call_(broadcast_call),
- commit_(commit) {
- stop_ = false;
-}
-
-ProtocolBase::ProtocolBase(int id, int f, int total_num)
- : ProtocolBase(id, f, total_num, nullptr, nullptr, nullptr) {}
-
-ProtocolBase::~ProtocolBase() { Stop(); }
-
-void ProtocolBase::Stop() { stop_ = true; }
-
-bool ProtocolBase::IsStop() { return stop_; }
-
-int ProtocolBase::SendMessage(int msg_type,
- const google::protobuf::Message& msg,
- int node_id) {
- return single_call_(msg_type, msg, node_id);
-}
-
-int ProtocolBase::Broadcast(int msg_type,
- const google::protobuf::Message& msg) {
- return broadcast_call_(msg_type, msg);
-}
-
-int ProtocolBase::Commit(const google::protobuf::Message& msg) {
- return commit_(msg);
-}
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/algorithm/protocol_base.h
b/platform/consensus/ordering/common/algorithm/protocol_base.h
deleted file mode 100644
index f8e47052..00000000
--- a/platform/consensus/ordering/common/algorithm/protocol_base.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <google/protobuf/message.h>
-
-#include <functional>
-
-#include "common/crypto/signature_verifier.h"
-
-namespace resdb {
-namespace common {
-
-class ProtocolBase {
- public:
- typedef std::function<int(int, const google::protobuf::Message& msg, int)>
- SingleCallFuncType;
- typedef std::function<int(int, const google::protobuf::Message& msg)>
- BroadcastCallFuncType;
- typedef std::function<int(const google::protobuf::Message& msg)>
- CommitFuncType;
-
- ProtocolBase(int id, int f, int total_num, SingleCallFuncType single_call,
- BroadcastCallFuncType broadcast_call, CommitFuncType commit);
-
- ProtocolBase(int id, int f, int total_num);
-
- virtual ~ProtocolBase();
-
- void Stop();
-
- inline void SetSingleCallFunc(SingleCallFuncType single_call) {
- single_call_ = single_call;
- }
-
- inline void SetBroadcastCallFunc(BroadcastCallFuncType broadcast_call) {
- broadcast_call_ = broadcast_call;
- }
-
- inline void SetCommitFunc(CommitFuncType commit_func) {
- commit_ = commit_func;
- }
-
- inline void SetSignatureVerifier(SignatureVerifier* verifier) {
- verifier_ = verifier;
- }
-
- protected:
- int SendMessage(int msg_type, const google::protobuf::Message& msg,
- int node_id);
- int Broadcast(int msg_type, const google::protobuf::Message& msg);
- int Commit(const google::protobuf::Message& msg);
-
- bool IsStop();
-
- protected:
- int id_;
- int f_;
- int total_num_;
- std::function<int(int, const google::protobuf::Message& msg, int)>
- single_call_;
- std::function<int(int, const google::protobuf::Message& msg)>
broadcast_call_;
- std::function<int(const google::protobuf::Message& msg)> commit_;
- std::atomic<bool> stop_;
-
- SignatureVerifier* verifier_;
-};
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/BUILD
b/platform/consensus/ordering/common/framework/BUILD
deleted file mode 100644
index c56c9260..00000000
--- a/platform/consensus/ordering/common/framework/BUILD
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-package(default_visibility = ["//platform/consensus/ordering:__subpackages__"])
-
-cc_library(
- name = "consensus",
- srcs = ["consensus.cpp"],
- hdrs = ["consensus.h"],
- deps = [
- ":performance_manager",
- ":response_manager",
- "//common/utils",
- "//executor/common:transaction_manager",
- "//platform/consensus/execution:transaction_executor",
- "//platform/consensus/ordering/common/algorithm:protocol_base",
- "//platform/networkstrate:consensus_manager",
- ],
-)
-
-cc_library(
- name = "performance_manager",
- srcs = ["performance_manager.cpp"],
- hdrs = ["performance_manager.h"],
- deps = [
- ":transaction_utils",
- "//platform/networkstrate:replica_communicator",
- "//platform/networkstrate:server_comm",
- ],
-)
-
-cc_library(
- name = "response_manager",
- srcs = ["response_manager.cpp"],
- hdrs = ["response_manager.h"],
- deps = [
- ":transaction_utils",
- "//platform/networkstrate:replica_communicator",
- "//platform/networkstrate:server_comm",
- ],
-)
-
-cc_library(
- name = "transaction_utils",
- srcs = ["transaction_utils.cpp"],
- hdrs = ["transaction_utils.h"],
- visibility = ["//visibility:public"],
- deps = [
- "//platform/proto:resdb_cc_proto",
- ],
-)
diff --git a/platform/consensus/ordering/common/framework/consensus.cpp
b/platform/consensus/ordering/common/framework/consensus.cpp
deleted file mode 100644
index 93e00cc8..00000000
--- a/platform/consensus/ordering/common/framework/consensus.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-Consensus::Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> executor)
- : ConsensusManager(config),
- replica_communicator_(GetBroadCastClient()),
- transaction_executor_(std::make_unique<TransactionExecutor>(
- config,
- [&](std::unique_ptr<Request> request,
- std::unique_ptr<BatchUserResponse> resp_msg) {
- ResponseMsg(*resp_msg);
- },
- nullptr, std::move(executor))) {
- LOG(INFO) << "is running is performance mode:"
- << config_.IsPerformanceRunning();
- is_stop_ = false;
- global_stats_ = Stats::GetGlobalStats();
-}
-
-void Consensus::Init() {
- if (performance_manager_ == nullptr) {
- performance_manager_ =
- config_.IsPerformanceRunning()
- ? std::make_unique<PerformanceManager>(
- config_, GetBroadCastClient(), GetSignatureVerifier())
- : nullptr;
- }
-
- if (response_manager_ == nullptr) {
- response_manager_ =
- !config_.IsPerformanceRunning()
- ? std::make_unique<ResponseManager>(config_, GetBroadCastClient(),
- GetSignatureVerifier())
- : nullptr;
- }
-}
-
-void Consensus::InitProtocol(ProtocolBase* protocol) {
- protocol->SetSingleCallFunc(
- [&](int type, const google::protobuf::Message& msg, int node_id) {
- return SendMsg(type, msg, node_id);
- });
-
- protocol->SetBroadcastCallFunc(
- [&](int type, const google::protobuf::Message& msg) {
- return Broadcast(type, msg);
- });
-
- protocol->SetCommitFunc(
- [&](const google::protobuf::Message& msg) { return CommitMsg(msg); });
-}
-
-Consensus::~Consensus() { is_stop_ = true; }
-
-void Consensus::SetPerformanceManager(
- std::unique_ptr<PerformanceManager> performance_manager) {
- performance_manager_ = std::move(performance_manager);
-}
-
-bool Consensus::IsStop() { return is_stop_; }
-
-void Consensus::SetupPerformanceDataFunc(std::function<std::string()> func) {
- performance_manager_->SetDataFunc(func);
-}
-
-void Consensus::SetCommunicator(ReplicaCommunicator* replica_communicator) {
- replica_communicator_ = replica_communicator;
-}
-
-int Consensus::Broadcast(int type, const google::protobuf::Message& msg) {
- Request request;
- msg.SerializeToString(request.mutable_data());
- request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
- request.set_user_type(type);
- request.set_sender_id(config_.GetSelfInfo().id());
-
- replica_communicator_->BroadCast(request);
- return 0;
-}
-
-int Consensus::SendMsg(int type, const google::protobuf::Message& msg,
- int node_id) {
- Request request;
- msg.SerializeToString(request.mutable_data());
- request.set_type(Request::TYPE_CUSTOM_CONSENSUS);
- request.set_user_type(type);
- request.set_sender_id(config_.GetSelfInfo().id());
- replica_communicator_->SendMessage(request, node_id);
- return 0;
-}
-
-std::vector<ReplicaInfo> Consensus::GetReplicas() {
- return config_.GetReplicaInfos();
-}
-
-int Consensus::CommitMsg(const google::protobuf::Message& txn) { return 0; }
-
-// The implementation of PBFT.
-int Consensus::ConsensusCommit(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request) {
- switch (request->type()) {
- case Request::TYPE_CLIENT_REQUEST:
- if (config_.IsPerformanceRunning()) {
- return performance_manager_->StartEval();
- }
- case Request::TYPE_RESPONSE:
- if (config_.IsPerformanceRunning()) {
- return performance_manager_->ProcessResponseMsg(std::move(context),
- std::move(request));
- }
- case Request::TYPE_NEW_TXNS: {
- return ProcessNewTransaction(std::move(request));
- }
- case Request::TYPE_CUSTOM_CONSENSUS: {
- return ProcessCustomConsensus(std::move(request));
- }
- }
- return 0;
-}
-
-int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
- return 0;
-}
-
-int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
- return 0;
-}
-
-int Consensus::ResponseMsg(const BatchUserResponse& batch_resp) {
- Request request;
- request.set_seq(batch_resp.seq());
- request.set_type(Request::TYPE_RESPONSE);
- request.set_sender_id(config_.GetSelfInfo().id());
- request.set_proxy_id(batch_resp.proxy_id());
- batch_resp.SerializeToString(request.mutable_data());
- replica_communicator_->SendMessage(request, request.proxy_id());
- return 0;
-}
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/consensus.h
b/platform/consensus/ordering/common/framework/consensus.h
deleted file mode 100644
index 2f2884b8..00000000
--- a/platform/consensus/ordering/common/framework/consensus.h
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/execution/transaction_executor.h"
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-#include "platform/consensus/ordering/common/framework/performance_manager.h"
-#include "platform/consensus/ordering/common/framework/response_manager.h"
-#include "platform/networkstrate/consensus_manager.h"
-
-namespace resdb {
-namespace common {
-
-class Consensus : public ConsensusManager {
- public:
- Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> transaction_manager);
- virtual ~Consensus();
-
- int ConsensusCommit(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request) override;
- std::vector<ReplicaInfo> GetReplicas() override;
-
- void SetupPerformanceDataFunc(std::function<std::string()> func);
-
- void SetCommunicator(ReplicaCommunicator* replica_communicator);
-
- void InitProtocol(ProtocolBase* protocol);
-
- protected:
- virtual int ProcessCustomConsensus(std::unique_ptr<Request> request);
- virtual int ProcessNewTransaction(std::unique_ptr<Request> request);
- virtual int CommitMsg(const google::protobuf::Message& msg);
-
- protected:
- int SendMsg(int type, const google::protobuf::Message& msg, int node_id);
- int Broadcast(int type, const google::protobuf::Message& msg);
- int ResponseMsg(const BatchUserResponse& batch_resp);
- void AsyncSend();
- bool IsStop();
-
- protected:
- void Init();
- void SetPerformanceManager(
- std::unique_ptr<PerformanceManager> performance_manger);
-
- protected:
- ReplicaCommunicator* replica_communicator_;
- std::unique_ptr<PerformanceManager> performance_manager_;
- std::unique_ptr<ResponseManager> response_manager_;
- std::unique_ptr<TransactionExecutor> transaction_executor_;
- Stats* global_stats_;
-
- LockFreeQueue<BatchUserResponse> resp_queue_;
- std::vector<std::thread> send_thread_;
- bool is_stop_;
-};
-
-} // namespace common
-} // namespace resdb
diff --git
a/platform/consensus/ordering/common/framework/performance_manager.cpp
b/platform/consensus/ordering/common/framework/performance_manager.cpp
deleted file mode 100644
index c07088f1..00000000
--- a/platform/consensus/ordering/common/framework/performance_manager.cpp
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/performance_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-using comm::CollectorResultCode;
-
-PerformanceManager::PerformanceManager(
- const ResDBConfig& config, ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier)
- : config_(config),
- replica_communicator_(replica_communicator),
- batch_queue_("user request"),
- verifier_(verifier) {
- stop_ = false;
- eval_started_ = false;
- eval_ready_future_ = eval_ready_promise_.get_future();
- if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() == CertificateKeyInfo::CLIENT) {
- for (int i = 0; i < 1; ++i) {
- user_req_thread_[i] =
- std::thread(&PerformanceManager::BatchProposeMsg, this);
- }
- }
- global_stats_ = Stats::GetGlobalStats();
- send_num_ = 0;
- total_num_ = 0;
- replica_num_ = config_.GetReplicaNum();
- id_ = config_.GetSelfInfo().id();
- primary_ = id_ % replica_num_;
- if (primary_ == 0) primary_ = replica_num_;
- local_id_ = 1;
- sum_ = 0;
-}
-
-PerformanceManager::~PerformanceManager() {
- stop_ = true;
- for (int i = 0; i < 16; ++i) {
- if (user_req_thread_[i].joinable()) {
- user_req_thread_[i].join();
- }
- }
-}
-
-int PerformanceManager::GetPrimary() { return primary_; }
-
-std::unique_ptr<Request> PerformanceManager::GenerateUserRequest() {
- std::unique_ptr<Request> request = std::make_unique<Request>();
- request->set_data(data_func_());
- return request;
-}
-
-void PerformanceManager::SetDataFunc(std::function<std::string()> func) {
- data_func_ = std::move(func);
-}
-
-int PerformanceManager::StartEval() {
- if (eval_started_) {
- return 0;
- }
- eval_started_ = true;
- for (int i = 0; i < 100000000; ++i) {
- std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
- queue_item->context = nullptr;
- queue_item->user_request = GenerateUserRequest();
- batch_queue_.Push(std::move(queue_item));
- if (i == 2000000) {
- eval_ready_promise_.set_value(true);
- }
- }
- LOG(WARNING) << "start eval done";
- return 0;
-}
-
-// =================== response ========================
-// handle the response message. If receive f+1 commit messages, send back to
the
-// user.
-int PerformanceManager::ProcessResponseMsg(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request) {
- std::unique_ptr<Request> response;
- // Add the response message, and use the call back to collect the received
- // messages.
- // The callback will be triggered if it received f+1 messages.
- if (request->ret() == -2) {
- // LOG(INFO) << "get response fail:" << request->ret();
- send_num_--;
- return 0;
- }
-
- // LOG(INFO) << "get response:" << request->seq() << "
- // sender:"<<request->sender_id();
- std::unique_ptr<BatchUserResponse> batch_response = nullptr;
- CollectorResultCode ret = AddResponseMsg(
- std::move(request), [&](std::unique_ptr<BatchUserResponse> request) {
- batch_response = std::move(request);
- return;
- });
-
- if (ret == CollectorResultCode::STATE_CHANGED) {
- assert(batch_response);
- SendResponseToClient(*batch_response);
- }
- return ret == CollectorResultCode::INVALID ? -2 : 0;
-}
-
-CollectorResultCode PerformanceManager::AddResponseMsg(
- std::unique_ptr<Request> request,
- std::function<void(std::unique_ptr<BatchUserResponse>)>
- response_call_back) {
- if (request == nullptr) {
- return CollectorResultCode::INVALID;
- }
-
- std::unique_ptr<BatchUserResponse> batch_response =
- std::make_unique<BatchUserResponse>();
- if (!batch_response->ParseFromString(request->data())) {
- LOG(ERROR) << "parse response fail:" << request->data().size()
- << " seq:" << request->seq();
- return CollectorResultCode::INVALID;
- }
-
- uint64_t seq = batch_response->local_id();
-
- bool done = false;
- {
- int idx = seq % response_set_size_;
- std::unique_lock<std::mutex> lk(response_lock_[idx]);
- if (response_[idx].find(seq) == response_[idx].end()) {
- return CollectorResultCode::OK;
- }
- response_[idx][seq]++;
- if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
- response_[idx].erase(response_[idx].find(seq));
- done = true;
- }
- }
- if (done) {
- response_call_back(std::move(batch_response));
- return CollectorResultCode::STATE_CHANGED;
- }
- return CollectorResultCode::OK;
-}
-
-void PerformanceManager::SendResponseToClient(
- const BatchUserResponse& batch_response) {
- uint64_t create_time = batch_response.createtime();
- if (create_time > 0) {
- uint64_t run_time = GetCurrentTime() - create_time;
- LOG(ERROR) << "receive current:" << GetCurrentTime()
- << " create time:" << create_time << " run time:" << run_time
- << " local id:" << batch_response.local_id();
- global_stats_->AddLatency(run_time);
- }
- send_num_--;
-}
-
-// =================== request ========================
-int PerformanceManager::BatchProposeMsg() {
- LOG(WARNING) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
- << " batch num:" << config_.ClientBatchNum()
- << " max txn:" << config_.GetMaxProcessTxn();
- std::vector<std::unique_ptr<QueueItem>> batch_req;
- eval_ready_future_.get();
- bool start = false;
- while (!stop_) {
- if (send_num_ > config_.GetMaxProcessTxn()) {
- usleep(100000);
- continue;
- }
- if (batch_req.size() < config_.ClientBatchNum()) {
- std::unique_ptr<QueueItem> item =
- batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
- if (item == nullptr) {
- if (start) {
- LOG(ERROR) << "no data";
- }
- continue;
- }
- batch_req.push_back(std::move(item));
- if (batch_req.size() < config_.ClientBatchNum()) {
- continue;
- }
- }
- start = true;
- DoBatch(batch_req);
- batch_req.clear();
- }
- return 0;
-}
-
-int PerformanceManager::DoBatch(
- const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
- auto new_request = comm::NewRequest(Request::TYPE_NEW_TXNS, Request(),
- config_.GetSelfInfo().id());
- if (new_request == nullptr) {
- return -2;
- }
-
- BatchUserRequest batch_request;
- for (size_t i = 0; i < batch_req.size(); ++i) {
- BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
- *req->mutable_request() = *batch_req[i]->user_request.get();
- req->set_id(i);
- }
-
- batch_request.set_local_id(local_id_++);
-
- {
- int idx = batch_request.local_id() % response_set_size_;
- std::unique_lock<std::mutex> lk(response_lock_[idx]);
- response_[idx][batch_request.local_id()]++;
- }
-
- batch_request.set_proxy_id(config_.GetSelfInfo().id());
- batch_request.set_createtime(GetCurrentTime());
- batch_request.SerializeToString(new_request->mutable_data());
- if (verifier_) {
- auto signature_or = verifier_->SignMessage(new_request->data());
- if (!signature_or.ok()) {
- LOG(ERROR) << "Sign message fail";
- return -2;
- }
- *new_request->mutable_data_signature() = *signature_or;
- }
-
- new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
- new_request->set_proxy_id(config_.GetSelfInfo().id());
- new_request->set_user_seq(batch_request.local_id());
-
- SendMessage(*new_request);
-
- global_stats_->BroadCastMsg();
- send_num_++;
- sum_ += batch_req.size();
- if (total_num_++ == 1000000) {
- stop_ = true;
- LOG(WARNING) << "total num is done:" << total_num_;
- }
- if (total_num_ % 1000 == 0) {
- LOG(WARNING) << "total num is :" << total_num_;
- }
- global_stats_->IncClientCall();
- return 0;
-}
-
-void PerformanceManager::SendMessage(const Request& request) {
- replica_communicator_->SendMessage(request, GetPrimary());
-}
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/performance_manager.h
b/platform/consensus/ordering/common/framework/performance_manager.h
deleted file mode 100644
index a34f0573..00000000
--- a/platform/consensus/ordering/common/framework/performance_manager.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include <future>
-
-#include "platform/config/resdb_config.h"
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-#include "platform/networkstrate/replica_communicator.h"
-#include "platform/networkstrate/server_comm.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace common {
-
-class PerformanceManager {
- public:
- PerformanceManager(const ResDBConfig& config,
- ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier);
-
- virtual ~PerformanceManager();
-
- int StartEval();
-
- int ProcessResponseMsg(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request);
- void SetDataFunc(std::function<std::string()> func);
-
- protected:
- virtual void SendMessage(const Request& request);
-
- private:
- // Add response messages which will be sent back to the caller
- // if there are f+1 same messages.
- comm::CollectorResultCode AddResponseMsg(
- std::unique_ptr<Request> request,
- std::function<void(std::unique_ptr<BatchUserResponse>)> call_back);
- void SendResponseToClient(const BatchUserResponse& batch_response);
-
- struct QueueItem {
- std::unique_ptr<Context> context;
- std::unique_ptr<Request> user_request;
- };
- int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
- int BatchProposeMsg();
- int GetPrimary();
- std::unique_ptr<Request> GenerateUserRequest();
-
- protected:
- ResDBConfig config_;
- ReplicaCommunicator* replica_communicator_;
-
- private:
- LockFreeQueue<QueueItem> batch_queue_;
- std::thread user_req_thread_[16];
- std::atomic<bool> stop_;
- Stats* global_stats_;
- std::atomic<int> send_num_;
- std::mutex mutex_;
- std::atomic<int> total_num_;
- SignatureVerifier* verifier_;
- SignatureInfo sig_;
- std::function<std::string()> data_func_;
- std::future<bool> eval_ready_future_;
- std::promise<bool> eval_ready_promise_;
- std::atomic<bool> eval_started_;
- std::atomic<int> fail_num_;
- static const int response_set_size_ = 6000000;
- std::map<int64_t, int> response_[response_set_size_];
- std::mutex response_lock_[response_set_size_];
- int replica_num_;
- int id_;
- int primary_;
- std::atomic<int> local_id_;
- std::atomic<int> sum_;
-};
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.cpp
b/platform/consensus/ordering/common/framework/response_manager.cpp
deleted file mode 100644
index 18eb065a..00000000
--- a/platform/consensus/ordering/common/framework/response_manager.cpp
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/response_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace common {
-
-using namespace resdb::comm;
-
-ResponseManager::ResponseManager(const ResDBConfig& config,
- ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier)
- : config_(config),
- replica_communicator_(replica_communicator),
- batch_queue_("user request"),
- verifier_(verifier) {
- stop_ = false;
- local_id_ = 1;
-
- if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() == CertificateKeyInfo::CLIENT ||
- config_.IsTestMode()) {
- user_req_thread_ = std::thread(&ResponseManager::BatchProposeMsg, this);
- }
- global_stats_ = Stats::GetGlobalStats();
- send_num_ = 0;
-}
-
-ResponseManager::~ResponseManager() {
- stop_ = true;
- if (user_req_thread_.joinable()) {
- user_req_thread_.join();
- }
-}
-
-// use system info
-int ResponseManager::GetPrimary() { return 1; }
-
-int ResponseManager::NewUserRequest(std::unique_ptr<Context> context,
- std::unique_ptr<Request> user_request) {
- context->client = nullptr;
-
- std::unique_ptr<QueueItem> queue_item = std::make_unique<QueueItem>();
- queue_item->context = std::move(context);
- queue_item->user_request = std::move(user_request);
-
- batch_queue_.Push(std::move(queue_item));
- return 0;
-}
-
-// =================== response ========================
-// handle the response message. If receive f+1 commit messages, send back to
the
-// caller.
-int ResponseManager::ProcessResponseMsg(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request) {
- std::unique_ptr<Request> response;
- // Add the response message, and use the call back to collect the received
- // messages.
- // The callback will be triggered if it received f+1 messages.
- if (request->ret() == -2) {
- LOG(ERROR) << "get response fail:" << request->ret();
- send_num_--;
- return 0;
- }
- CollectorResultCode ret =
- AddResponseMsg(std::move(request), [&](const Request& request) {
- response = std::make_unique<Request>(request);
- return;
- });
-
- if (ret == CollectorResultCode::STATE_CHANGED) {
- BatchUserResponse batch_response;
- if (batch_response.ParseFromString(response->data())) {
- SendResponseToClient(batch_response);
- } else {
- LOG(ERROR) << "parse response fail:";
- }
- }
- return ret == CollectorResultCode::INVALID ? -2 : 0;
-}
-
-CollectorResultCode ResponseManager::AddResponseMsg(
- std::unique_ptr<Request> request,
- std::function<void(const Request&)> response_call_back) {
- if (request == nullptr) {
- return CollectorResultCode::INVALID;
- }
-
- int type = request->type();
- uint64_t seq = request->seq();
- bool done = false;
- {
- int idx = seq % response_set_size_;
- std::unique_lock<std::mutex> lk(response_lock_[idx]);
- if (response_[idx][seq] == -1) {
- return CollectorResultCode::OK;
- }
- response_[idx][seq]++;
- if (response_[idx][seq] >= config_.GetMinClientReceiveNum()) {
- response_[idx][seq] = -1;
- done = true;
- }
- }
- if (done) {
- response_call_back(*request);
- return CollectorResultCode::STATE_CHANGED;
- }
- return CollectorResultCode::OK;
-}
-
-void ResponseManager::SendResponseToClient(
- const BatchUserResponse& batch_response) {
- uint64_t create_time = batch_response.createtime();
- uint64_t local_id = batch_response.local_id();
- if (create_time > 0) {
- uint64_t run_time = GetCurrentTime() - create_time;
- global_stats_->AddLatency(run_time);
- } else {
- LOG(ERROR) << "seq:" << local_id << " no resp";
- }
- send_num_--;
-}
-
-// =================== request ========================
-int ResponseManager::BatchProposeMsg() {
- LOG(INFO) << "batch wait time:" << config_.ClientBatchWaitTimeMS()
- << " batch num:" << config_.ClientBatchNum();
- std::vector<std::unique_ptr<QueueItem>> batch_req;
- while (!stop_) {
- if (send_num_ > config_.GetMaxProcessTxn()) {
- LOG(ERROR) << "send num too high, wait:" << send_num_;
- usleep(100);
- continue;
- }
- if (batch_req.size() < config_.ClientBatchNum()) {
- std::unique_ptr<QueueItem> item =
- batch_queue_.Pop(config_.ClientBatchWaitTimeMS());
- if (item != nullptr) {
- batch_req.push_back(std::move(item));
- if (batch_req.size() < config_.ClientBatchNum()) {
- continue;
- }
- }
- }
- if (batch_req.empty()) {
- continue;
- }
- int ret = DoBatch(batch_req);
- batch_req.clear();
- if (ret != 0) {
- Response response;
- response.set_result(Response::ERROR);
- for (size_t i = 0; i < batch_req.size(); ++i) {
- if (batch_req[i]->context && batch_req[i]->context->client) {
- int ret = batch_req[i]->context->client->SendRawMessage(response);
- if (ret) {
- LOG(ERROR) << "send resp" << response.DebugString()
- << " fail ret:" << ret;
- }
- }
- }
- }
- }
- return 0;
-}
-
-int ResponseManager::DoBatch(
- const std::vector<std::unique_ptr<QueueItem>>& batch_req) {
- auto new_request =
- NewRequest(Request::TYPE_NEW_TXNS, Request(),
config_.GetSelfInfo().id());
- if (new_request == nullptr) {
- return -2;
- }
- std::vector<std::unique_ptr<Context>> context_list;
-
- BatchUserRequest batch_request;
- for (size_t i = 0; i < batch_req.size(); ++i) {
- BatchUserRequest::UserRequest* req = batch_request.add_user_requests();
- *req->mutable_request() = *batch_req[i]->user_request.get();
- *req->mutable_signature() = batch_req[i]->context->signature;
- req->set_id(i);
- context_list.push_back(std::move(batch_req[i]->context));
- }
-
- if (!config_.IsPerformanceRunning()) {
- LOG(ERROR) << "add context list:" << new_request->seq()
- << " list size:" << context_list.size();
- batch_request.set_local_id(local_id_);
- }
- batch_request.set_createtime(GetCurrentTime());
- std::string data;
- batch_request.SerializeToString(&data);
- if (verifier_) {
- auto signature_or = verifier_->SignMessage(data);
- if (!signature_or.ok()) {
- LOG(ERROR) << "Sign message fail";
- return -2;
- }
- *new_request->mutable_data_signature() = *signature_or;
- }
-
- batch_request.SerializeToString(new_request->mutable_data());
- new_request->set_hash(SignatureVerifier::CalculateHash(new_request->data()));
- new_request->set_proxy_id(config_.GetSelfInfo().id());
- replica_communicator_->SendMessage(*new_request, GetPrimary());
- send_num_++;
- LOG(INFO) << "send msg to primary:" << GetPrimary()
- << " batch size:" << batch_req.size();
- return 0;
-}
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/response_manager.h
b/platform/consensus/ordering/common/framework/response_manager.h
deleted file mode 100644
index fef43dd8..00000000
--- a/platform/consensus/ordering/common/framework/response_manager.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "platform/config/resdb_config.h"
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-#include "platform/networkstrate/replica_communicator.h"
-#include "platform/networkstrate/server_comm.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace common {
-
-class ResponseManager {
- public:
- ResponseManager(const ResDBConfig& config,
- ReplicaCommunicator* replica_communicator,
- SignatureVerifier* verifier);
-
- ~ResponseManager();
-
- std::vector<std::unique_ptr<Context>> FetchContextList(uint64_t id);
-
- int NewUserRequest(std::unique_ptr<Context> context,
- std::unique_ptr<Request> user_request);
-
- int ProcessResponseMsg(std::unique_ptr<Context> context,
- std::unique_ptr<Request> request);
-
- private:
- // Add response messages which will be sent back to the caller
- // if there are f+1 same messages.
- comm::CollectorResultCode AddResponseMsg(
- std::unique_ptr<Request> request,
- std::function<void(const Request&)> call_back);
- void SendResponseToClient(const BatchUserResponse& batch_response);
-
- struct QueueItem {
- std::unique_ptr<Context> context;
- std::unique_ptr<Request> user_request;
- };
- int DoBatch(const std::vector<std::unique_ptr<QueueItem>>& batch_req);
- int BatchProposeMsg();
- int GetPrimary();
-
- private:
- ResDBConfig config_;
- ReplicaCommunicator* replica_communicator_;
- LockFreeQueue<QueueItem> batch_queue_;
- std::thread user_req_thread_;
- std::atomic<bool> stop_;
- uint64_t local_id_ = 0;
- Stats* global_stats_;
- std::atomic<int> send_num_;
- SignatureVerifier* verifier_;
- static const int response_set_size_ = 6000000;
- std::map<int64_t, int> response_[response_set_size_];
- std::mutex response_lock_[response_set_size_];
-};
-
-} // namespace common
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.cpp
b/platform/consensus/ordering/common/framework/transaction_utils.cpp
deleted file mode 100644
index 9daca020..00000000
--- a/platform/consensus/ordering/common/framework/transaction_utils.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/framework/transaction_utils.h"
-
-namespace resdb {
-namespace comm {
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id) {
- auto new_request = std::make_unique<Request>(request);
- new_request->set_type(type);
- new_request->set_sender_id(sender_id);
- return new_request;
-}
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id, int region_id) {
- auto new_request = std::make_unique<Request>(request);
- new_request->set_type(type);
- new_request->set_sender_id(sender_id);
- new_request->mutable_region_info()->set_region_id(region_id);
- return new_request;
-}
-
-} // namespace comm
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/framework/transaction_utils.h
b/platform/consensus/ordering/common/framework/transaction_utils.h
deleted file mode 100644
index 57bb8d35..00000000
--- a/platform/consensus/ordering/common/framework/transaction_utils.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-#include "platform/proto/replica_info.pb.h"
-#include "platform/proto/resdb.pb.h"
-
-namespace resdb {
-namespace comm {
-
-enum CollectorResultCode {
- INVALID = -2,
- OK = 0,
- STATE_CHANGED = 1,
-};
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id);
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id, int region_info);
-} // namespace comm
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/transaction_utils.cpp
b/platform/consensus/ordering/common/transaction_utils.cpp
deleted file mode 100644
index fea180a0..00000000
--- a/platform/consensus/ordering/common/transaction_utils.cpp
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "platform/consensus/ordering/common/transaction_utils.h"
-
-namespace resdb {
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id) {
- auto new_request = std::make_unique<Request>(request);
- new_request->set_type(type);
- new_request->set_sender_id(sender_id);
- return new_request;
-}
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id, int region_id) {
- auto new_request = std::make_unique<Request>(request);
- new_request->set_type(type);
- new_request->set_sender_id(sender_id);
- new_request->mutable_region_info()->set_region_id(region_id);
- return new_request;
-}
-
-} // namespace resdb
diff --git a/platform/consensus/ordering/common/transaction_utils.h
b/platform/consensus/ordering/common/transaction_utils.h
deleted file mode 100644
index 151b431c..00000000
--- a/platform/consensus/ordering/common/transaction_utils.h
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#pragma once
-
-#include "platform/proto/replica_info.pb.h"
-#include "platform/proto/resdb.pb.h"
-
-namespace resdb {
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id);
-
-std::unique_ptr<Request> NewRequest(Request::Type type, const Request& request,
- int sender_id, int region_info);
-
-} // namespace resdb