This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/smart_contract_merge by this
push:
new e5695291 merge master
e5695291 is described below
commit e56952911aa098a391392f744503bfdd8cd04c30
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 14 03:06:10 2025 +0800
merge master
---
platform/config/resdb_config.cpp | 5 +-
platform/config/resdb_config.h | 15 +-
.../consensus/execution/transaction_executor.cpp | 394 +++++++++++++--
.../consensus/execution/transaction_executor.h | 51 +-
.../consensus/ordering/cassandra/algorithm/BUILD | 76 ---
.../ordering/cassandra/algorithm/cassandra.cpp | 562 ---------------------
.../ordering/cassandra/algorithm/cassandra.h | 114 -----
.../cassandra/algorithm/proposal_graph.cpp | 491 ------------------
.../ordering/cassandra/algorithm/proposal_graph.h | 87 ----
.../cassandra/algorithm/proposal_manager.cpp | 470 -----------------
.../cassandra/algorithm/proposal_manager.h | 73 ---
.../ordering/cassandra/algorithm/proposal_state.h | 16 -
.../ordering/cassandra/algorithm/ranking.cpp | 12 -
.../ordering/cassandra/algorithm/ranking.h | 14 -
.../consensus/ordering/cassandra/framework/BUILD | 16 -
.../ordering/cassandra/framework/consensus.cpp | 171 -------
.../ordering/cassandra/framework/consensus.h | 59 ---
.../cassandra/framework/consensus_test.cpp | 179 -------
platform/consensus/ordering/cassandra/proto/BUILD | 16 -
.../ordering/cassandra/proto/proposal.proto | 119 -----
20 files changed, 427 insertions(+), 2513 deletions(-)
diff --git a/platform/config/resdb_config.cpp b/platform/config/resdb_config.cpp
index 4c3ff954..23129a25 100644
--- a/platform/config/resdb_config.cpp
+++ b/platform/config/resdb_config.cpp
@@ -76,6 +76,9 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
if (config_data_.tcp_batch_num() == 0) {
config_data_.set_tcp_batch_num(100);
}
+ if (config_data_.max_process_txn() == 0) {
+ config_data_.set_max_process_txn(64);
+ }
}
void ResDBConfig::SetConfigData(const ResConfigData& config_data) {
@@ -177,7 +180,7 @@ void ResDBConfig::SetSignatureVerifierEnabled(bool
enable_sv) {
}
// Performance setting
-bool ResDBConfig::IsPerformanceRunning() {
+bool ResDBConfig::IsPerformanceRunning() const {
return is_performance_running_ || GetConfigData().is_performance_running();
}
diff --git a/platform/config/resdb_config.h b/platform/config/resdb_config.h
index fb56a9dd..9867c6d5 100644
--- a/platform/config/resdb_config.h
+++ b/platform/config/resdb_config.h
@@ -94,7 +94,7 @@ class ResDBConfig {
void SetSignatureVerifierEnabled(bool enable_sv);
// Performance setting
- bool IsPerformanceRunning();
+ bool IsPerformanceRunning() const;
void RunningPerformance(bool);
bool IsTestMode() const;
@@ -135,15 +135,18 @@ class ResDBConfig {
bool signature_verifier_enabled_ = true;
bool is_performance_running_ = false;
bool is_test_mode_ = false;
- uint32_t max_process_txn_ = 2048;
uint32_t client_batch_wait_time_ms_ = 100; // milliseconds, 0.1s
- uint32_t client_batch_num_ = 100;
uint64_t viewchange_commit_timeout_ms_ =
60000; // default 60s to change viewchange
- uint32_t worker_num_ = 64;
- uint32_t input_worker_num_ = 1;
- uint32_t output_worker_num_ = 1;
+
+ // This is the default settings.
+ // change these parameters in the configuration.
+ uint32_t max_process_txn_ = 64;
+ uint32_t worker_num_ = 16;
+ uint32_t input_worker_num_ = 5;
+ uint32_t output_worker_num_ = 5;
+ uint32_t client_batch_num_ = 100;
};
} // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index fd24da3a..1f98babd 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,6 +20,7 @@
#include "platform/consensus/execution/transaction_executor.h"
#include <glog/logging.h>
+#include "common/utils/utils.h"
namespace resdb {
@@ -35,30 +36,86 @@ TransactionExecutor::TransactionExecutor(
execute_queue_("execute"),
stop_(false),
duplicate_manager_(nullptr) {
+
+ memset(blucket_, 0, sizeof(blucket_));
global_stats_ = Stats::GetGlobalStats();
ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
- execute_thread_ = std::thread(&TransactionExecutor::ExecuteMessage, this);
+ for (int i = 0; i < execute_thread_num_; ++i) {
+ execute_thread_.push_back(
+ std::thread(&TransactionExecutor::ExecuteMessage, this));
+ }
+
+ for (int i = 0; i < 1; ++i) {
+ prepare_thread_.push_back(
+ std::thread(&TransactionExecutor::PrepareMessage, this));
+ }
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
execute_OOO_thread_ =
std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
LOG(ERROR) << " is out of order:" << transaction_manager_->IsOutOfOrder();
}
+ gc_thread_ = std::thread(&TransactionExecutor::GCProcess, this);
}
TransactionExecutor::~TransactionExecutor() { Stop(); }
+void TransactionExecutor::RegisterExecute(int64_t seq) {
+ if (execute_thread_num_ == 1) return;
+ int idx = seq % blucket_num_;
+ std::unique_lock<std::mutex> lk(mutex_);
+ // LOG(ERROR)<<"register seq:"<<seq<<" bluck:"<<blucket_[idx];
+ assert(!blucket_[idx] || !(blucket_[idx] ^ 3));
+ blucket_[idx] = 1;
+ // LOG(ERROR)<<"register seq:"<<seq;
+}
+
+void TransactionExecutor::WaitForExecute(int64_t seq) {
+ if (execute_thread_num_ == 1) return;
+ int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
+
+ while (!IsStop()) {
+ std::unique_lock<std::mutex> lk(mutex_);
+ cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
+ return ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]);
+ });
+ if ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]) {
+ break;
+ }
+ }
+ // LOG(ERROR)<<"wait for :"<<seq<<" done";
+}
+
+void TransactionExecutor::FinishExecute(int64_t seq) {
+ if (execute_thread_num_ == 1) return;
+ int idx = seq % blucket_num_;
+ std::unique_lock<std::mutex> lk(mutex_);
+ // LOG(ERROR)<<"finish :"<<seq<<" done";
+ blucket_[idx] = 3;
+ cv_.notify_all();
+}
+
void TransactionExecutor::Stop() {
stop_ = true;
if (ordering_thread_.joinable()) {
ordering_thread_.join();
}
- if (execute_thread_.joinable()) {
- execute_thread_.join();
+ for (auto& th : execute_thread_) {
+ if (th.joinable()) {
+ th.join();
+ }
+ }
+ for (auto& th : prepare_thread_) {
+ if (th.joinable()) {
+ th.join();
+ }
}
if (execute_OOO_thread_.joinable()) {
execute_OOO_thread_.join();
}
+ if (gc_thread_.joinable()) {
+ gc_thread_.join();
+ }
}
Storage* TransactionExecutor::GetStorage() {
@@ -144,6 +201,12 @@ void TransactionExecutor::OrderMessage() {
return;
}
+void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
+ global_stats_->IncCommit();
+ message->set_commit_time(GetCurrentTime());
+ execute_queue_.Push(std::move(message));
+}
+
void TransactionExecutor::ExecuteMessage() {
while (!IsStop()) {
auto message = execute_queue_.Pop();
@@ -154,7 +217,11 @@ void TransactionExecutor::ExecuteMessage() {
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
need_execute = false;
}
+ int64_t start_time = GetCurrentTime();
+ global_stats_->AddExecuteQueuingLatency(start_time -
message->commit_time());
Execute(std::move(message), need_execute);
+ int64_t end_time = GetCurrentTime();
+ global_stats_->AddExecuteLatency(end_time-start_time);
}
}
@@ -184,10 +251,7 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// LOG(INFO) << " get request batch size:"
// << batch_request.user_requests_size()<<" proxy
// id:"<<request->proxy_id();
- // std::unique_ptr<BatchUserResponse> batch_response =
- // std::make_unique<BatchUserResponse>();
std::unique_ptr<BatchUserResponse> response;
- global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
@@ -198,51 +262,321 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
- // Execute the request, then send the response back to the user.
- BatchUserRequest batch_request;
- if (!batch_request.ParseFromString(request->data())) {
- LOG(ERROR) << "parse data fail";
+ uint64_t uid = request->uid();
+ int64_t seq = request->seq();
+ RegisterExecute(request->seq());
+ std::unique_ptr<BatchUserRequest> batch_request = nullptr;
+ std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
data;
+ std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+ BatchUserRequest* batch_request_p = nullptr;
+
+ bool need_gc = false;
+
+ if (request->uid() > 0) {
+ bool current_f = SetFlag(uid, Start_Execute);
+ if (!current_f) {
+ global_stats_->ConsumeTransactions(1);
+ std::unique_ptr<std::future<int>> data_f = GetFuture(uid);
+ //LOG(ERROR)<<"wait prepare:"<<uid;
+ {
+ data_f->get();
+ }
+ //LOG(ERROR)<<"wait prepare done:"<<uid;
+
+ // prepare is done
+ //LOG(ERROR)<<"prepare is done:"<<uid;
+ {
+ int64_t start_time = GetCurrentTime();
+ std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+ if(req_[uid % mod][uid] == nullptr){
+ LOG(ERROR)<<"data is empty:"<<uid;
+ }
+ assert(req_[uid % mod][uid] != nullptr);
+ //batch_request = std::move(req_[uid % mod][uid]);
+ batch_request_p = req_[uid % mod][uid].get();
+ auto it = data_[uid % mod].find(uid);
+ if (it != data_[uid % mod].end()) {
+ assert(it->second!=nullptr);
+ data_p = it->second.get();
+ //data = std::move(it->second);
+ }
+ int64_t end_time = GetCurrentTime();
+ if (end_time - start_time > 1000) {
+ LOG(ERROR) << "get data done:" << uid
+ << " wait time:" << (end_time - start_time);
+ }
+ }
+ ClearPromise(uid);
+ need_gc = true;
+ } else {
+ global_stats_->AddNewTransactions(1);
+ //LOG(ERROR)<<"commit start:"<<uid;
+ // LOG(ERROR)<<" prepare not start:"<<uid;
+ }
}
- batch_request.set_seq(request->seq());
- batch_request.set_hash(request->hash());
- batch_request.set_proxy_id(request->proxy_id());
- if (request->has_committed_certs()) {
- *batch_request.mutable_committed_certs() = request->committed_certs();
+
+ // Execute the request, then send the response back to the user.
+ if (batch_request_p == nullptr) {
+ batch_request = std::make_unique<BatchUserRequest>();
+ if (!batch_request->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse data fail";
+ }
+ batch_request->set_hash(request->hash());
+ if (request->has_committed_certs()) {
+ *batch_request->mutable_committed_certs() = request->committed_certs();
+ }
+ batch_request->set_seq(request->seq());
+ batch_request->set_proxy_id(request->proxy_id());
+ batch_request_p = batch_request.get();
+ // LOG(ERROR)<<"get data from req:";
+ } else {
+ assert(batch_request_p);
+ batch_request_p->set_seq(request->seq());
+ batch_request_p->set_proxy_id(request->proxy_id());
+ // LOG(ERROR)<<" get from cache:"<<uid;
}
+ assert(batch_request_p);
// LOG(INFO) << " get request batch size:"
- // << batch_request.user_requests_size()<<" proxy
- // id:"<<request->proxy_id()<<" need execute:"<<need_execute;
- // std::unique_ptr<BatchUserResponse> batch_response =
- // std::make_unique<BatchUserResponse>();
+ // << batch_request.user_requests_size()<<" proxy id:"
+ // <<request->proxy_id()<<" need execute:"<<need_execute;
std::unique_ptr<BatchUserResponse> response;
- global_stats_->GetTransactionDetails(batch_request);
+ // need_execute = false;
if (transaction_manager_ && need_execute) {
- response = transaction_manager_->ExecuteBatch(batch_request);
- }
+ if (execute_thread_num_ == 1) {
+ response = transaction_manager_->ExecuteBatch(*batch_request_p);
+ } else {
+ std::vector<std::unique_ptr<std::string>> response_v;
+
+ if(data_p == nullptr) {
+ int64_t start_time = GetCurrentTime();
+ data = std::move(transaction_manager_->Prepare(*batch_request_p));
+ int64_t end_time = GetCurrentTime();
+ if (end_time - start_time > 10) {
+ // LOG(ERROR)<<"exec data done:"<<uid<<" wait
+ // time:"<<(end_time-start_time);
+ }
+ data_p = data.get();
+ }
- if (duplicate_manager_) {
- duplicate_manager_->AddExecuted(batch_request.hash(), batch_request.seq());
+ WaitForExecute(request->seq());
+ if(data_p->empty() || (*data_p)[0] == nullptr){
+ response =
transaction_manager_->ExecuteBatch(*batch_request_p);
+ }
+ else {
+ response_v =
transaction_manager_->ExecuteBatchData(*data_p);
+ }
+ FinishExecute(request->seq());
+
+ if(response == nullptr){
+ response = std::make_unique<BatchUserResponse>();
+ for (auto& s : response_v) {
+ response->add_response()->swap(*s);
+ }
+ }
+ }
}
+ // LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
- global_stats_->IncTotalRequest(batch_request.user_requests_size());
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
+ global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
+ response->set_proxy_id(batch_request_p->proxy_id());
+ response->set_createtime(batch_request_p->createtime() +
request->queuing_time());
+ response->set_local_id(batch_request_p->local_id());
+ global_stats_->AddCommitDelay(GetCurrentTime()- response->createtime());
- response->set_proxy_id(batch_request.proxy_id());
- response->set_createtime(batch_request.createtime());
- response->set_local_id(batch_request.local_id());
- response->set_hash(batch_request.hash());
+ response->set_seq(request->seq());
- post_exec_func_(std::move(request), std::move(response));
+ if (post_exec_func_) {
+ post_exec_func_(std::move(request), std::move(response));
+ }
global_stats_->IncExecuteDone();
+ if(need_gc){
+ gc_queue_.Push(std::make_unique<int64_t>(uid));
+ }
}
void TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
+
+bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
+ std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+ auto it = flag_[uid % mod].find(uid);
+ if (it == flag_[uid % mod].end()) {
+ flag_[uid % mod][uid] |= f;
+ // LOG(ERROR)<<"NO FUTURE uid:"<<uid;
+ return true;
+ }
+ assert(it != flag_[uid % mod].end());
+ if (f == Start_Prepare) {
+ if (flag_[uid % mod][uid] & Start_Execute) {
+ return false;
+ }
+ } else if(f == Start_Execute){
+ if (flag_[uid % mod][uid] & End_Prepare) {
+ //if (flag_[uid % mod][uid] & Start_Prepare) {
+ return false;
+ }
+ }
+ flag_[uid % mod][uid] |= f;
+ return true;
+}
+
+void TransactionExecutor::ClearPromise(uint64_t uid) {
+ std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+ auto it = pre_[uid % mod].find(uid);
+ if (it == pre_[uid % mod].end()) {
+ return;
+ }
+ // LOG(ERROR)<<"CLEAR UID:"<<uid;
+ assert(it != pre_[uid % mod].end());
+ assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
+ //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+ //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+ //data_[uid%mod].erase(data_[uid%mod].find(uid));
+ //req_[uid%mod].erase(req_[uid%mod].find(uid));
+ pre_[uid % mod].erase(it);
+ flag_[uid % mod].erase(flag_[uid % mod].find(uid));
+}
+
+std::promise<int>* TransactionExecutor::GetPromise(uint64_t uid) {
+ std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+ auto it = pre_[uid % mod].find(uid);
+ if (it == pre_[uid % mod].end()) {
+ return nullptr;
+ }
+ return it->second.get();
+}
+
+std::unique_ptr<std::future<int>> TransactionExecutor::GetFuture(uint64_t uid)
{
+ std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+ auto it = pre_[uid % mod].find(uid);
+ if (it == pre_[uid % mod].end()) {
+ return nullptr;
+ }
+ //return std::move(it->second);
+ // LOG(ERROR)<<"add future:"<<uid;
+ return std::make_unique<std::future<int>>(it->second->get_future());
+}
+
+bool TransactionExecutor::AddFuture(uint64_t uid) {
+ std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
+ auto it = pre_[uid % mod].find(uid);
+ if (it == pre_[uid % mod].end()) {
+ // LOG(ERROR)<<"add future:"<<uid;
+ std::unique_ptr<std::promise<int>> p =
+ std::make_unique<std::promise<int>>();
+ //auto f = std::make_unique<std::future<int>>(p->get_future());
+ pre_[uid % mod][uid] = std::move(p);
+ //pre_f_[uid % mod][uid] = std::move(f);
+ flag_[uid % mod][uid] = 0;
+ return true;
+ }
+ return false;
+}
+
+void TransactionExecutor::Prepare(std::unique_ptr<Request> request) {
+ if (AddFuture(request->uid())) {
+ prepare_queue_.Push(std::move(request));
+ }
+}
+
+void TransactionExecutor::GCProcess() {
+ while (!IsStop()) {
+ std::unique_ptr<int64_t> uid_or = gc_queue_.Pop();
+ if (uid_or== nullptr) {
+ continue;
+ }
+ int64_t uid = *uid_or;
+
+ std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
+ {
+ std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+ assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+ data_p = data_[uid%mod][uid].get();
+ }
+
+ for(int i = 0; i < data_p->size(); ++i){
+ (*data_p)[i].release();
+ }
+ (*data_p).clear();
+ {
+ std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+ assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
+ assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
+ data_[uid%mod].erase(data_[uid%mod].find(uid));
+ req_[uid%mod].erase(req_[uid%mod].find(uid));
+ }
+ }
+}
+
+void TransactionExecutor::PrepareMessage() {
+ while (!IsStop()) {
+ std::unique_ptr<Request> request = prepare_queue_.Pop();
+ if (request == nullptr) {
+ continue;
+ }
+
+ uint64_t uid = request->uid();
+ int current_f = SetFlag(uid, Start_Prepare);
+ if (current_f == 0) {
+ // commit has done
+ // LOG(ERROR)<<" want prepare, commit started:"<<uid;
+// ClearPromise(uid);
+ continue;
+ }
+
+ std::promise<int>* p = GetPromise(uid) ;
+ assert(p);
+ //LOG(ERROR)<<" prepare started:"<<uid;
+
+ // LOG(ERROR)<<" prepare uid:"<<uid;
+
+ // Execute the request, then send the response back to the user.
+ std::unique_ptr<BatchUserRequest> batch_request =
+ std::make_unique<BatchUserRequest>();
+ if (!batch_request->ParseFromString(request->data())) {
+ LOG(ERROR) << "parse data fail";
+ }
+ // batch_request = std::make_unique<BatchUserRequest>();
+ batch_request->set_seq(request->seq());
+ batch_request->set_hash(request->hash());
+ batch_request->set_proxy_id(request->proxy_id());
+ if (request->has_committed_certs()) {
+ *batch_request->mutable_committed_certs() = request->committed_certs();
+ }
+
+ // LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
+ // id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
+
+ std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
+ request_v = transaction_manager_->Prepare(*batch_request);
+ {
+ std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
+ // assert(request_v);
+ // assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
+ data_[uid%mod][uid] = std::move(request_v);
+ req_[uid % mod][uid] = std::move(batch_request);
+ }
+ //LOG(ERROR)<<"set promise:"<<uid;
+ p->set_value(1);
+ {
+ int set_ret = SetFlag(uid, End_Prepare);
+ if (set_ret == 0) {
+ // LOG(ERROR)<<"commit interrupt:"<<uid;
+ //ClearPromise(uid);
+ } else {
+ //LOG(ERROR)<<"prepare done:"<<uid;
+ }
+ }
+ }
+}
+
+
} // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.h
b/platform/consensus/execution/transaction_executor.h
index 2b111164..cbd31c60 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -62,8 +62,16 @@ class TransactionExecutor {
void SetDuplicateManager(DuplicateManager* manager);
+ void AddExecuteMessage(std::unique_ptr<Request> message);
+
Storage* GetStorage();
+ void RegisterExecute(int64_t seq);
+ void WaitForExecute(int64_t seq);
+ void FinishExecute(int64_t seq);
+
+ void Prepare(std::unique_ptr<Request> request);
+
private:
void Execute(std::unique_ptr<Request> request, bool need_execute = true);
void OnlyExecute(std::unique_ptr<Request> request);
@@ -80,6 +88,15 @@ class TransactionExecutor {
void UpdateMaxExecutedSeq(uint64_t seq);
+ bool SetFlag(uint64_t uid, int f);
+ void ClearPromise(uint64_t uid);
+ void PrepareMessage();
+ void GCProcess();
+
+ bool AddFuture(uint64_t uid);
+ std::unique_ptr<std::future<int>> GetFuture(uint64_t uid);
+ std::promise<int>* GetPromise(uint64_t uid);
+
protected:
ResDBConfig config_;
@@ -91,11 +108,43 @@ class TransactionExecutor {
SystemInfo* system_info_ = nullptr;
std::unique_ptr<TransactionManager> transaction_manager_ = nullptr;
std::map<uint64_t, std::unique_ptr<Request>> candidates_;
- std::thread ordering_thread_, execute_thread_, execute_OOO_thread_;
+ std::thread ordering_thread_, execute_OOO_thread_;
+ std::vector<std::thread> execute_thread_;
LockFreeQueue<Request> commit_queue_, execute_queue_, execute_OOO_queue_;
std::atomic<bool> stop_;
Stats* global_stats_ = nullptr;
DuplicateManager* duplicate_manager_;
+ int execute_thread_num_ = 10;
+ static const int blucket_num_ = 1024;
+ int blucket_[blucket_num_];
+ std::condition_variable cv_;
+ std::mutex mutex_;
+
+ enum PrepareType {
+ Start_Prepare = 1,
+ Start_Execute = 2,
+ End_Prepare = 4,
+ };
+
+
+ std::vector<std::thread> prepare_thread_;
+ std::thread gc_thread_;
+ static const int mod = 2048;
+ std::mutex f_mutex_[mod], fd_mutex_[mod];
+ LockFreeQueue<Request> prepare_queue_;
+ LockFreeQueue<int64_t> gc_queue_;
+ typedef std::unique_ptr<std::promise<int>> PromiseType;
+ std::map<uint64_t, PromiseType> pre_[mod];
+
+ std::map<uint64_t, std::unique_ptr<std::future<int>>> pre_f_[mod];
+ std::map<uint64_t, int> flag_[mod];
+
+ std::map<uint64_t, std::unique_ptr<BatchUserRequest>> req_[mod];
+ std::unordered_map<
+ uint64_t,
+ std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>>
+ data_[mod];
+
};
} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/BUILD
b/platform/consensus/ordering/cassandra/algorithm/BUILD
deleted file mode 100644
index 1d86e0fe..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/BUILD
+++ /dev/null
@@ -1,76 +0,0 @@
-package(default_visibility =
["//platform/consensus/ordering/cassandra:__subpackages__"])
-
-cc_library(
- name = "proposal_state",
- hdrs = ["proposal_state.h"],
-)
-
-cc_library(
- name = "proposal_manager",
- srcs = ["proposal_manager.cpp"],
- hdrs = ["proposal_manager.h"],
- deps = [
- ":proposal_graph",
- "//common:comm",
- "//platform/statistic:stats",
- "//common/crypto:signature_verifier",
- "//common/utils",
- "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
- ],
-)
-
-cc_library(
- name = "ranking",
- srcs = ["ranking.cpp"],
- hdrs = ["ranking.h"],
- deps = [
- "//common:comm",
- ],
-)
-
-cc_library(
- name = "proposal_graph",
- srcs = ["proposal_graph.cpp"],
- hdrs = ["proposal_graph.h"],
- deps = [
- ":ranking",
- ":proposal_state",
- "//platform/statistic:stats",
- "//common:comm",
- "//common/utils",
- "//platform/consensus/ordering/cassandra/proto:proposal_cc_proto",
- ],
-)
-
-cc_library(
- name = "cassandra",
- srcs = ["cassandra.cpp"],
- hdrs = ["cassandra.h"],
- deps = [
- ":proposal_graph",
- ":proposal_manager",
- "//platform/statistic:stats",
- "//common:comm",
- "//common/crypto:signature_verifier",
- "//platform/consensus/ordering/common/algorithm:protocol_base",
- "//platform/common/queue:lock_free_queue",
- ],
-)
-
-cc_test(
- name = "proposal_graph_test",
- srcs = ["proposal_graph_test.cpp"],
- deps = [
- ":proposal_graph",
- "//common/test:test_main",
- ],
-)
-
-cc_test(
- name = "cassandra_test",
- srcs = ["cassandra_test.cpp"],
- deps = [
- ":cassandra",
- "//common/test:test_main",
- ],
-)
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
b/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
deleted file mode 100644
index 04a81bb2..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.cpp
+++ /dev/null
@@ -1,562 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/cassandra.h"
-
-#include <glog/logging.h>
-
-#include "common/crypto/signature_verifier.h"
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-Cassandra::Cassandra(int id, int f, int total_num, SignatureVerifier* verifier)
- : ProtocolBase(id, f, total_num), verifier_(verifier) {
-
- LOG(ERROR) << "get proposal graph";
- id_ = id;
- total_num_ = total_num;
- f_ = f;
- is_stop_ = false;
- timeout_ms_ = 60000;
- local_txn_id_ = 1;
- local_proposal_id_ = 1;
- batch_size_ = 10;
-
- recv_num_ = 0;
- execute_num_ = 0;
- executed_ = 0;
- committed_num_ = 0;
- precommitted_num_ = 0;
- execute_id_ = 1;
-
- graph_ = std::make_unique<ProposalGraph>(f_);
- proposal_manager_ = std::make_unique<ProposalManager>(id, graph_.get());
-
- graph_->SetCommitCallBack(
- [&](const Proposal& proposal) { CommitProposal(proposal); });
-
- Reset();
-
- consensus_thread_ = std::thread(&Cassandra::AsyncConsensus, this);
-
- block_thread_ = std::thread(&Cassandra::BroadcastTxn, this);
-
- commit_thread_ = std::thread(&Cassandra::AsyncCommit, this);
-
- global_stats_ = Stats::GetGlobalStats();
-
- prepare_thread_ = std::thread(&Cassandra::AsyncPrepare, this);
-}
-
-Cassandra::~Cassandra() {
- is_stop_ = true;
- if (consensus_thread_.joinable()) {
- consensus_thread_.join();
- }
- if (commit_thread_.joinable()) {
- commit_thread_.join();
- }
- if (prepare_thread_.joinable()) {
- prepare_thread_.join();
- }
-}
-
-void Cassandra::SetPrepareFunction(std::function<int(const Transaction&)>
prepare){
- prepare_ = prepare;
-}
-
-bool Cassandra::IsStop() { return is_stop_; }
-
-void Cassandra::Reset() {
- // received_num_.clear();
- // state_ = State::NewProposal;
-}
-
-void Cassandra::AsyncConsensus() {
- int height = 0;
- int64_t start_time = GetCurrentTime();
- while (!is_stop_) {
- int next_height = SendTxn(height);
- if (next_height == -1) {
- // usleep(10000);
- proposal_manager_->WaitBlock();
- continue;
- }
- int64_t end_time = GetCurrentTime();
- global_stats_->AddRoundLatency(end_time - start_time);
- start_time = end_time;
- height = next_height;
- WaitVote(height);
- }
-}
-
-bool Cassandra::WaitVote(int height) {
- std::unique_lock<std::mutex> lk(mutex_);
- vote_cv_.wait_for(lk, std::chrono::microseconds(timeout_ms_ * 1000),
- [&] { return can_vote_[height]; });
- if (!can_vote_[height]) {
- LOG(ERROR) << "wait vote time out"
- << " can vote:" << can_vote_[height] << " height:" << height;
- }
- return true;
-}
-
-void Cassandra::AsyncCommit() {
- while (!is_stop_) {
- std::unique_ptr<Proposal> p = execute_queue_.Pop(timeout_ms_ * 1000);
- if (p == nullptr) {
- LOG(ERROR) << "execu timeout";
- continue;
- }
- //LOG(ERROR) << "execute proposal from proposer:" <<
- // p->header().proposer_id()
- // << " id:" << p->header().proposal_id()
- // << " height:" << p->header().height()
- // << " block size:" << p->block_size();
-
- int txn_num = 0;
- for (const Block& block : p->block()) {
- std::unique_lock<std::mutex> lk(mutex_);
- std::unique_ptr<Block> data_block =
- proposal_manager_->GetBlock(block.hash(), p->header().proposer_id());
- //LOG(ERROR)<<"!!!!!!!!! commit proposal from:"
- //<<p->header().proposer_id()<<" txn size:"
- //<<data_block->data().transaction_size()<<"
height:"<<p->header().height();
- if (p->header().proposer_id() == id_) {
- execute_num_ += data_block->data().transaction_size();
- // LOG(ERROR) << "recv num:" << recv_num_
- // << " execute num:" << execute_num_
- // << " block id:" << data_block->local_id()
- // << " block delay:" << (GetCurrentTime() -
- // data_block->create_time());
- }
-
- for (Transaction& txn :
- *data_block->mutable_data()->mutable_transaction()) {
- txn.set_id(execute_id_++);
- txn_num++;
- commit_(txn);
- }
- }
- global_stats_->AddCommitTxn(txn_num);
- }
-}
-
-void Cassandra::CommitProposal(const Proposal& p) {
- //LOG(ERROR) << "commit proposal from proposer:" << p.header().proposer_id()
- // << " id:" << p.header().proposal_id()
- // << " height:" << p.header().height()
- // << " block size:" << p.block_size();
- if (p.block_size() == 0) {
- return;
- }
- // proposal_manager_->ClearProposal(p);
- committed_num_++;
- int64_t commit_time = GetCurrentTime() - p.create_time();
- global_stats_->AddCommitLatency(commit_time);
- // LOG(ERROR) << "commit num:" << committed_num_
- // << " commit delay:" << GetCurrentTime() - p.create_time();
- execute_queue_.Push(std::make_unique<Proposal>(p));
-}
-
-void Cassandra::AsyncPrepare() {
- while (!is_stop_) {
- std::unique_ptr<Proposal> p = prepare_queue_.Pop();
- if (p == nullptr) {
- //LOG(ERROR) << "execu timeout";
- continue;
- }
- //LOG(ERROR)<<"prepare block from:"<<p->header().proposer_id() <<"
id:"<<p->header().proposal_id();
- int id = 0;
- for (const Block& block : p->block()) {
- std::unique_lock<std::mutex> lkx(mutex_);
- Block* data_block = proposal_manager_->GetBlockSnap(
- block.hash(), p->header().proposer_id());
- if (data_block == nullptr) {
- continue;
- }
-
- for (Transaction& txn :
- *data_block->mutable_data()->mutable_transaction()) {
- long long uid = (((long long)p->header().proposer_id() << 50) |
- (long long)(p->header().proposal_id()) << 20 | id++);
- txn.set_uid(uid);
- prepare_(txn);
- }
- }
-
- // LOG(ERROR)<<" weak proposal:"<<p->weak_proposals().hash_size();
- for (const std::string& hash : p->weak_proposals().hash()) {
- const Proposal* pre_p = graph_->GetProposalInfo(hash);
- if (pre_p == nullptr) {
- continue;
- }
- //LOG(ERROR)<<"prepare weak block from:"<<pre_p->header().proposer_id()
- //<<" id:"<<pre_p->header().proposal_id();
- for (const Block& block : pre_p->block()) {
- std::unique_lock<std::mutex> lkx(mutex_);
- Block* data_block = proposal_manager_->GetBlockSnap(
- block.hash(), pre_p->header().proposer_id());
- if (data_block == nullptr) {
- continue;
- }
-
- for (Transaction& txn :
- *data_block->mutable_data()->mutable_transaction()) {
- long long uid = (((long long)pre_p->header().proposer_id() << 50) |
- (long long)(pre_p->header().proposal_id()) << 20 |
id++);
- txn.set_uid(uid);
- prepare_(txn);
- }
- }
- }
- //LOG(ERROR) << "prepare done";
- }
-}
-
-void Cassandra::PrepareProposal(const Proposal& p) {
- if (p.block_size() == 0) {
- return;
- }
- prepare_queue_.Push(std::make_unique<Proposal>(p));
-}
-
-bool Cassandra::ReceiveTransaction(std::unique_ptr<Transaction> txn) {
- // LOG(ERROR)<<"recv txn:";
- txn->set_create_time(GetCurrentTime());
- txns_.Push(std::move(txn));
- recv_num_++;
- return true;
-}
-
-void Cassandra::BroadcastTxn() {
- std::vector<std::unique_ptr<Transaction>> txns;
- int num = 0;
- while (!IsStop()) {
- std::unique_ptr<Transaction> txn = txns_.Pop();
- if (txn == nullptr) {
- continue;
- }
- txn->set_queuing_time(GetCurrentTime()-txn->create_time());
- global_stats_->AddQueuingLatency(GetCurrentTime()-txn->create_time());
- //LOG(ERROR)<<"get txn, proxy id:"<<txn->proxy_id()<<" hash:"<<txn->hash();
- txns.push_back(std::move(txn));
- /*
- if (txns.size() < batch_size_) {
- continue;
- }
- */
-
- for(int i = 1; i < batch_size_; ++i){
- std::unique_ptr<Transaction> txn = txns_.Pop(0);
- if(txn == nullptr){
- break;
- }
- //LOG(ERROR)<<"get txn, proxy id:"<<txn->proxy_id()<<"
hash:"<<txn->hash();
- txn->set_queuing_time(GetCurrentTime()-txn->create_time());
- global_stats_->AddQueuingLatency(GetCurrentTime()-txn->create_time());
- txns.push_back(std::move(txn));
- }
-
- std::unique_ptr<Block> block = proposal_manager_->MakeBlock(txns);
- assert(block != nullptr);
- //LOG(ERROR)<<" send block:"<<block->local_id();
- Broadcast(MessageType::NewBlocks, *block);
- proposal_manager_->AddLocalBlock(std::move(block));
- txns.clear();
- }
-}
-
-void Cassandra::ReceiveBlock(std::unique_ptr<Block> block) {
- // std::unique_lock<std::mutex> lk(g_mutex_);
- //LOG(ERROR)<<"recv block from:"<<block->sender_id()<<" block
id:"<<block->local_id();
- BlockACK block_ack;
- block_ack.set_hash(block->hash());
- block_ack.set_sender_id(block->sender_id());
- block_ack.set_local_id(block->local_id());
- block_ack.set_responder(id_);
-
- // std::unique_lock<std::mutex> lk(mutex_);
- proposal_manager_->AddBlock(std::move(block));
- //LOG(ERROR)<<" send block to:"<<block_ack.sender_id();
- SendMessage(MessageType::CMD_BlockACK, block_ack, block_ack.sender_id());
-}
-
-void Cassandra::ReceiveBlockACK(std::unique_ptr<BlockACK> block) {
- //LOG(ERROR)<<"recv block ack:"<<block->local_id()<<"
from:"<<block->responder();
- assert(block->sender_id() == id_);
- // std::unique_lock<std::mutex> lkx(g_mutex_);
- std::unique_lock<std::mutex> lk(block_mutex_);
- if (received_.find(block->local_id()) != received_.end()) {
- return;
- }
- block_ack_[block->local_id()].insert(block->responder());
- //LOG(ERROR)<<"recv block ack:"<<block->local_id()
- //<<" from:"<<block->responder()<< "
num:"<<block_ack_[block->local_id()].size();
- if (block_ack_[block->local_id()].size() >= 2 * f_ + 1 &&
- block_ack_[block->local_id()].find(id_) !=
- block_ack_[block->local_id()].end()) {
- // std::unique_lock<std::mutex> lk(mutex_);
- proposal_manager_->BlockReady(block->hash(), block->local_id());
- received_.insert(block->local_id());
- }
-}
-
-int Cassandra::SendTxn(int round) {
- std::unique_ptr<Proposal> proposal = nullptr;
- // LOG(ERROR)<<"send:"<<round;
- {
- // std::unique_lock<std::mutex> lkx(g_mutex_);
- round++;
- std::unique_lock<std::mutex> lk(mutex_);
- int current_round = proposal_manager_->CurrentRound();
- //LOG(ERROR)<<"current round:"<<current_round<<" send round:"<<round;
- assert(current_round < round);
-
- proposal = proposal_manager_->GenerateProposal(round, start_);
- if (proposal == nullptr) {
- LOG(ERROR) << "no transactions";
- if (start_ == false) {
- return -1;
- }
- }
-
- if(!proposal->header().prehash().empty()){
- const Proposal* pre_p =
- graph_->GetProposalInfo(proposal->header().prehash());
- assert(pre_p != nullptr);
- PrepareProposal(*pre_p);
- }
- }
-
- //LOG(ERROR)<<"his size:"<<proposal->history_size();
- for (const auto& his : proposal->history()) {
- int sender = his.sender();
- int id = his.id();
- const std::string& hash = his.hash();
- int state = his.state();
- if (state != ProposalState::New) {
- continue;
- }
- const Proposal* p = graph_->GetProposalInfo(hash);
- assert(p);
- for (const auto& b : p->block()) {
- bool ret = proposal_manager_->ContainBlock(b.hash(), b.sender_id());
- //LOG(ERROR) << "sender:" << sender << " id:" << his.id()
- // << " block id:" << b.local_id()
- // << " block sender:" << b.sender_id() << " ret:" << ret
- // << " state:" << state;
- assert(ret);
- }
- }
- proposal_manager_->AddLocalProposal(*proposal);
-
- //LOG(ERROR) << "====== bc proposal block size:" << proposal->block_size()
- //<< " round:" << round
- //<< " id:" << proposal->header().proposal_id()
- //<< " weak links:"<< proposal->weak_proposals().hash_size();
-
- Broadcast(MessageType::NewProposal, *proposal);
-
- assert(proposal->header().height() == round);
- //current_round_ = round;
- return proposal->header().height();
-}
-
-void Cassandra::SendBlock(const BlockQuery& block) {
- // std::unique_lock<std::mutex> lk(g_mutex_);
- // std::unique_lock<std::mutex> lk(mutex_);
- //LOG(ERROR)<<" query block from:"<<block.sender()<<" block
id:"<<block.local_id();
- const std::string& hash = block.hash();
- const Block* block_resp = proposal_manager_->QueryBlock(hash);
- assert(block_resp != nullptr);
- //LOG(ERROR) << "send block :" << block.local_id() << " to :" <<
block.sender();
- SendMessage(MessageType::NewBlocks, *block_resp, block.sender());
-}
-
-void Cassandra::AskBlock(const Block& block) {
- BlockQuery query;
- query.set_hash(block.hash());
- query.set_proposer(block.sender_id());
- query.set_local_id(block.local_id());
- query.set_sender(id_);
- //LOG(ERROR) << "ask block from:" << block.sender_id();
- SendMessage(MessageType::CMD_BlockQuery, query, block.sender_id());
-}
-
-void Cassandra::AskProposal(const Proposal& proposal) {
- ProposalQuery query;
- query.set_hash(proposal.header().hash());
- query.set_proposer(proposal.header().proposer_id());
- query.set_id(proposal.header().proposal_id());
- query.set_sender(id_);
- //LOG(ERROR) << "ask proposal from:" << proposal.header().proposer_id();
- SendMessage(MessageType::CMD_ProposalQuery, query,
proposal.header().proposer_id());
-}
-
-void Cassandra::SendProposal(const ProposalQuery& query) {
- std::unique_lock<std::mutex> lk(g_mutex_);
- // std::unique_lock<std::mutex> plk(mutex_);
- // std::unique_lock<std::mutex> lk(mutex_);
- //LOG(ERROR) << "!!!!! recv query proposal:" << query.id()
- // << " from :" << query.sender();
- const std::string& hash = query.hash();
- std::unique_ptr<ProposalQueryResp> resp =
- proposal_manager_->QueryProposal(hash);
- assert(resp != nullptr);
- //LOG(ERROR) << "!!!!! send proposal:" << query.id()
- // << " to :" << query.sender();
- SendMessage(MessageType::CMD_ProposalQueryResponse, *resp, query.sender());
-}
-
-void Cassandra::ReceiveProposalQueryResp(const ProposalQueryResp& resp) {
- // std::unique_lock<std::mutex> lkx(g_mutex_);
- std::unique_lock<std::mutex> lk(mutex_);
- //LOG(ERROR) << "!!!!! recv proposal query resp";
- proposal_manager_->VerifyProposal(resp);
-}
-
-bool Cassandra::ReceiveProposal(std::unique_ptr<Proposal> proposal) {
- // std::unique_lock<std::mutex> lk(g_mutex_);
- {
- // LOG(ERROR)<<"recv proposal";
- std::unique_lock<std::mutex> lk(mutex_);
- const Proposal* pre_p =
- graph_->GetProposalInfo(proposal->header().prehash());
- if (pre_p == nullptr) {
- //LOG(ERROR) << "receive proposal from :"
- // << proposal->header().proposer_id()
- // << " id:" << proposal->header().proposal_id() << "no pre:";
- if(!proposal->header().prehash().empty()) {
- if (proposal->header().height() > graph_->GetCurrentHeight()) {
- future_proposal_[proposal->header().height()].push_back(
- std::move(proposal));
- return true;
- }
- }
- assert(proposal->header().prehash().empty());
- } else {
- //LOG(ERROR) << "receive proposal from :"
- // << proposal->header().proposer_id()
- // << " id:" << proposal->header().proposal_id()
- // << " pre:" << pre_p->header().proposer_id()
- // << " pre id:" << pre_p->header().proposal_id();
- }
-
- /*
- if(proposal->header().height() >=200 && proposal->header().height()
<205){
- if(!((proposal->header().proposer_id() <= 23 && id_ <=23)
- || (proposal->header().proposer_id() > 23 && id_ >23))) {
- //if(proposal->header().proposer_id() % 2 != id_%2){
- return true;
- }
- }
- */
-
- if (proposal->header().height() > graph_->GetCurrentHeight()) {
- future_proposal_[proposal->header().height()].push_back(
- std::move(proposal));
- return true;
- }
- AddProposal(*proposal);
-
- auto it = future_proposal_.find(graph_->GetCurrentHeight());
- if (it != future_proposal_.end()) {
- for (auto& p : it->second) {
- AddProposal(*p);
- }
- future_proposal_.erase(it);
- }
- }
- // LOG(ERROR)<<"receive proposal done";
- return true;
-}
-
-bool Cassandra::AddProposal(const Proposal& proposal) {
- int ret = proposal_manager_->VerifyProposal(proposal);
- if (ret != 0) {
- if (ret == 2) {
- LOG(ERROR) << "verify proposal fail";
- AskProposal(proposal);
- }
- return false;
- }
-
- for (const Block& block : proposal.block()) {
- bool ret = false;;
- for(int i = 0; i< 5; ++i){
- ret = proposal_manager_->ContainBlock(block);
- if (ret == false) {
- LOG(ERROR) << "======== block from:" << block.sender_id()
- << " block id:" << block.local_id() << " not exist";
- usleep(1000);
- //AskBlock(block);
- continue;
- }
- else {
- break;
- }
- }
- assert(ret);
- }
-
- {
- std::unique_lock<std::mutex> lk(g_mutex_);
- // LOG(ERROR) << "add proposal";
- int v_ret = graph_->AddProposal(proposal);
- if (v_ret != 0) {
- LOG(ERROR) << "add proposal fail, ret:" << v_ret;
- if (v_ret == 2) {
- // miss history
- // AskProposal(proposal);
- }
- // TrySendRecoveery(proposal);
- return false;
- }
-
- if (proposal.header().proposer_id() == id_) {
- proposal_manager_->RemoveLocalProposal(proposal.header().hash());
- }
- }
-
- received_num_[proposal.header().height()].insert(
- proposal.header().proposer_id());
- //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
- // << " proposal height:" << proposal.header().height()
- // << " num:" << received_num_[graph_->GetCurrentHeight()].size()
- // << " from:" << proposal.header().proposer_id()
- // << " last vote:" << last_vote_;
- if (received_num_[graph_->GetCurrentHeight()].size() == total_num_) {
- if (last_vote_ < graph_->GetCurrentHeight()) {
- last_vote_ = graph_->GetCurrentHeight();
- can_vote_[graph_->GetCurrentHeight()] = true;
- vote_cv_.notify_all();
- //LOG(ERROR) << "can vote:";
- }
- }
- // LOG(ERROR)<<"recv done";
-
- std::vector<std::unique_ptr<Proposal>> future_g = graph_->GetNotFound(
- proposal.header().height() + 1, proposal.header().hash());
- if (future_g.size() > 0) {
- // LOG(ERROR) << "get future size:" << future_g.size();
- for (auto& it : future_g) {
- if (!graph_->AddProposal(*it)) {
- LOG(ERROR) << "add future proposal fail";
- // TrySendRecoveery(proposal);
- continue;
- }
-
- received_num_[it->header().height()].insert(it->header().proposer_id());
- //LOG(ERROR) << "received current height:" << graph_->GetCurrentHeight()
- // << " num:" <<
received_num_[graph_->GetCurrentHeight()].size()
- // << " from:" << it->header().proposer_id()
- // << " last vote:" << last_vote_;
- }
- }
- return true;
-}
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/cassandra.h
b/platform/consensus/ordering/cassandra/algorithm/cassandra.h
deleted file mode 100644
index b4a5e221..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/cassandra.h
+++ /dev/null
@@ -1,114 +0,0 @@
-#pragma once
-
-#include <deque>
-#include <map>
-#include <queue>
-#include <thread>
-
-#include "platform/common/queue/lock_free_queue.h"
-#include "platform/consensus/ordering/common/algorithm/protocol_base.h"
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_manager.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class Cassandra: public common::ProtocolBase {
- public:
- Cassandra(int id, int f, int total_num, SignatureVerifier* verifier);
- ~Cassandra();
-
- void CheckBlock(const std::string& hash, int local_id);
- void ReceiveBlock(std::unique_ptr<Block> block);
- void ReceiveBlockACK(std::unique_ptr<BlockACK> block);
- bool ReceiveTransaction(std::unique_ptr<Transaction> txn);
- bool ReceiveProposal(std::unique_ptr<Proposal> proposal);
- bool ReceiveVote(const VoteMessage& msg);
- bool ReceivePrepare(const VoteMessage& msg);
- int ReceiveRecovery(const CommittedProposals& proposals);
-
- bool ReceiveVoteACK(const VoteMessage& msg);
- bool ReceiveCommit(const VoteMessage& msg);
-
- void AskBlock(const Block& block);
- void SendBlock(const BlockQuery& block);
-
- void AskProposal(const Proposal& proposal);
- void SendProposal(const ProposalQuery& query);
- void ReceiveProposalQueryResp(const ProposalQueryResp& resp);
- void PrepareProposal(const Proposal& p);
-
-
- void SetPrepareFunction(std::function<int(const Transaction&)> prepare);
-
- private:
- bool IsStop();
-
- int SendTxn(int round);
-
- void Commit(const VoteMessage& msg);
- void CommitProposal(const Proposal& p);
-
- void AsyncConsensus();
- void AsyncCommit();
- void AsyncPrepare();
- bool WaitVote(int);
- void WaitCommit();
-
- bool CheckHistory(const Proposal& proposal);
-
- void Reset();
- bool CheckState(MessageType type, ProposalState state);
-
- void TrySendRecoveery(const Proposal& proposal);
- void BroadcastTxn();
- bool AddProposal(const Proposal& proposal);
-
- bool ProcessProposal(std::unique_ptr<Proposal> proposal);
-
- private:
- std::unique_ptr<ProposalGraph> graph_;
- LockFreeQueue<Transaction> txns_;
- std::unique_ptr<ProposalManager> proposal_manager_;
- SignatureVerifier* verifier_;
- std::mutex mutex_, g_mutex_;
- std::map<int, std::set<int>> received_num_;
- // int state_;
- int id_, total_num_, f_, batch_size_;
- std::atomic<int> is_stop_;
- int timeout_ms_;
- int local_txn_id_, local_proposal_id_;
- LockFreeQueue<Proposal> commit_queue_, execute_queue_, prepare_queue_;
- std::thread commit_thread_, consensus_thread_, block_thread_,
prepare_thread_;
- std::condition_variable vote_cv_;
- std::map<int, bool> can_vote_;
- std::atomic<int> committed_num_;
- int voting_, start_ = false;
- std::map<int, std::vector<std::unique_ptr<Transaction>>> uncommitted_txn_;
-
- bool use_linear_ = false;
- int recv_num_ = 0;
- int execute_num_ = 0;
- int pending_num_ = 0;
- std::atomic<int> executed_;
- std::atomic<int> precommitted_num_;
- int last_vote_ = 0;
- int execute_id_;
-
- std::mutex block_mutex_;
- std::set<int> received_;
- std::map<int, std::set<int>> block_ack_;
- std::map<int, std::vector<std::unique_ptr<Proposal>>> future_proposal_;
-
- std::function<int(const Transaction&)> prepare_;
- int current_round_;
-
- Stats* global_stats_;
-};
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
deleted file mode 100644
index a923da84..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.cpp
+++ /dev/null
@@ -1,491 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-
-#include <glog/logging.h>
-
-#include <queue>
-#include <stack>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-std::vector<ProposalState> GetStates() {
- return std::vector<ProposalState>{ProposalState::New,
ProposalState::Prepared,
- ProposalState::PreCommit};
-}
-
-ProposalGraph::ProposalGraph(int fault_num) : f_(fault_num) {
- ranking_ = std::make_unique<Ranking>();
- current_height_ = 0;
- global_stats_ = Stats::GetGlobalStats();
-}
-
-void ProposalGraph::IncreaseHeight() {
- //LOG(ERROR) << "increase height:" << current_height_;
- current_height_++;
-}
-
-void ProposalGraph::TryUpgradeHeight(int height) {
- if (last_node_[height].size() > 0) {
- // LOG(ERROR) << "upgrade height:" << height;
- current_height_ = height;
- } else {
- // assert(1 == 0);
- LOG(ERROR) << "need to recovery";
- }
-}
-
-std::string Encode(const std::string& hash) {
- std::string ret;
- for (int i = 0; i < hash.size(); ++i) {
- int x = hash[i];
- ret += std::to_string(x);
- }
- return ret;
-}
-
-void ProposalGraph::AddProposalOnly(const Proposal& proposal) {
- auto it = node_info_.find(proposal.header().hash());
- if (it == node_info_.end()) {
- auto np = std::make_unique<NodeInfo>(proposal);
- node_info_[proposal.header().hash()] = std::move(np);
- //LOG(ERROR) << "add proposal proposer:" << proposal.header().proposer_id()
- // << " id:" << proposal.header().proposal_id()
- // << " hash:" << Encode(proposal.header().hash());
- }
-}
-
-int ProposalGraph::AddProposal(const Proposal& proposal) {
- //LOG(ERROR) << "add proposal height:" << proposal.header().height()
- // << " current height:" << current_height_<<"
from:"<<proposal.header().proposer_id()<<" proposal
id:"<<proposal.header().proposal_id();
- assert(current_height_ >= latest_commit_.header().height());
- /*
- if (proposal.header().height() < current_height_) {
- LOG(ERROR) << "height not match:" << current_height_
- << " proposal height:" << proposal.header().height();
- return false;
- }
- */
-
- if (proposal.header().height() > current_height_) {
- pending_header_[proposal.header().height()].insert(
- proposal.header().proposer_id());
- } else {
- while (!pending_header_.empty()) {
- if (pending_header_.begin()->first <= current_height_) {
- pending_header_.erase(pending_header_.begin());
- } else {
- break;
- }
- }
- }
-
- if (proposal.header().height() > current_height_ + 1) {
- LOG(ERROR) << "height not match:" << current_height_
- << " proposal height:" << proposal.header().height();
- if (pending_header_[proposal.header().height()].size() >= f_ + 1) {
- TryUpgradeHeight(proposal.header().height());
- }
- return 1;
- }
-
- if (!VerifyParent(proposal)) {
- LOG(ERROR) << "verify parent fail:" << proposal.header().proposer_id()
- << " id:" << proposal.header().proposal_id();
- // assert(1==0);
- return 2;
- }
-
- // LOG(ERROR)<<"history size:"<<proposal.history_size();
- for (const auto& history : proposal.history()) {
- std::string hash = history.hash();
- auto node_it = node_info_.find(hash);
- assert(node_it != node_info_.end());
- /*
- if (node_it == node_info_.end()) {
- LOG(ERROR) << " history node not found";
- return false;
- }
- else {
- LOG(ERROR)<<"find history";
- }
- */
- }
-
- for (const auto& history : proposal.history()) {
- std::string hash = history.hash();
- auto node_it = node_info_.find(hash);
- for (auto s : GetStates()) {
- if (s >= node_it->second->state && s <= history.state()) {
- if (s != history.state()) {
- LOG(ERROR) << "update from higher state";
- }
- node_it->second->votes[s].insert(proposal.header().proposer_id());
- }
- }
- //
LOG(ERROR)<<"proposal:("<<node_it->second->proposal.header().proposer_id()
- //<<","<<node_it->second->proposal.header().proposal_id()<<")"
- //<<" history state:"<<history.state()<<"
- //num:"<<node_it->second->votes[history.state()].size();
- CheckState(node_it->second.get(),
- static_cast<resdb::cassandra::ProposalState>(history.state()));
- if (node_it->second->state == ProposalState::PreCommit) {
- Commit(hash);
- }
- //LOG(ERROR)<<"history
proposal:("<<node_it->second->proposal.header().proposer_id()
- //<<","<<node_it->second->proposal.header().proposal_id()
- //<<" state:"<<node_it->second->state;
- }
-
- if (proposal.header().height() < current_height_) {
- LOG(ERROR) << "height not match:" << current_height_
- << " proposal height:" << proposal.header().height();
-
- // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
- // id:"<<proposal.header().proposal_id();
- // g_[proposal.header().prehash()].push_back(proposal.header().hash());
- auto np = std::make_unique<NodeInfo>(proposal);
- new_proposals_[proposal.header().hash()] = &np->proposal;
- node_info_[proposal.header().hash()] = std::move(np);
- last_node_[proposal.header().height()].insert(proposal.header().hash());
-
- return 1;
- } else {
- g_[proposal.header().prehash()].push_back(proposal.header().hash());
- auto np = std::make_unique<NodeInfo>(proposal);
- new_proposals_[proposal.header().hash()] = &np->proposal;
- // LOG(ERROR)<<"add proposal proposer:"<<proposal.header().proposer_id()<<"
- // id:"<<proposal.header().proposal_id()<<"
- // hash:"<<Encode(proposal.header().hash());
- node_info_[proposal.header().hash()] = std::move(np);
- last_node_[proposal.header().height()].insert(proposal.header().hash());
- }
- return 0;
-}
-
-void ProposalGraph::UpgradeState(ProposalState& state) {
- switch (state) {
- case None:
- case New:
- state = Prepared;
- break;
- case Prepared:
- state = PreCommit;
- break;
- default:
- break;
- }
-}
-
-int ProposalGraph::CheckState(NodeInfo* node_info, ProposalState state) {
- // LOG(ERROR) << "node: (" << node_info->proposal.header().proposer_id() <<
- // ","
- // << node_info->proposal.header().proposal_id()
- // << ") state:" << node_info->state
- // << " vote num:" << node_info->votes[node_info->state].size();
-
- while (node_info->votes[node_info->state].size() >= 2 * f_ + 1) {
- UpgradeState(node_info->state);
- // LOG(ERROR) << "========== proposal:("
- // << node_info->proposal.header().proposer_id() << ","
- // << node_info->proposal.header().proposal_id() << ")"
- // << " upgrate state:" << node_info->state << (GetCurrentTime()
-
- // node_info->proposal.create_time());
- }
- return true;
-}
-
-void ProposalGraph::Commit(const std::string& hash) {
- auto it = node_info_.find(hash);
- if (it == node_info_.end()) {
- LOG(ERROR) << "node not found, hash:" << hash;
- assert(1 == 0);
- return;
- }
-
- if (it->second->state != ProposalState::PreCommit) {
- LOG(ERROR) << "hash not committed:" << hash;
- assert(1 == 0);
- return;
- }
-
- std::set<std::string> is_main_hash;
- is_main_hash.insert(hash);
-
- int from_proposer = it->second->proposal.header().proposer_id();
- int from_proposal_id = it->second->proposal.header().proposal_id();
- //LOG(ERROR)<<"commit :"<<it->second->proposal.header().proposer_id()<<"
id:"<<it->second->proposal.header().proposal_id();
-
- std::vector<std::vector<Proposal*>> commit_p;
- auto bfs = [&]() {
- std::queue<std::string> q;
- q.push(hash);
- while (!q.empty()) {
- std::string c_hash = q.front();
- q.pop();
-
- auto it = node_info_.find(c_hash);
- if (it == node_info_.end()) {
- LOG(ERROR) << "node not found, hash:";
- assert(1 == 0);
- }
-
- Proposal* p = &it->second->proposal;
- if (it->second->state == ProposalState::Committed) {
- // LOG(ERROR)<<" try commit proposal,
- // sender:"<<p->header().proposer_id()
- //<<" proposal id:"<<p->header().proposal_id()<<" has been committed";
- continue;
- }
-
- it->second->state = ProposalState::Committed;
- if (is_main_hash.find(c_hash) != is_main_hash.end()) {
- commit_num_[p->header().proposer_id()]++;
- // LOG(ERROR)<<"commit main node:"<<p->header().proposer_id();
- is_main_hash.insert(p->header().prehash());
- commit_p.push_back(std::vector<Proposal*>());
- }
-
- commit_p.back().push_back(p);
- //LOG(ERROR)<<"commit node:"<<p->header().proposer_id()<<"
id:"<<p->header().proposal_id()
- //<<" weak proposal size:"<<p->weak_proposals().hash_size();
- //LOG(ERROR)<<"push p:"<<p->header().proposer_id();
- for (const std::string& w_hash : p->weak_proposals().hash()) {
- auto it = node_info_.find(w_hash);
- if (it == node_info_.end()) {
- LOG(ERROR) << "node not found, hash:";
- assert(1 == 0);
- }
-
- // LOG(ERROR)<<"add weak
- // proposal:"<<it->second->proposal.header().proposer_id()<<"
- // id:"<<it->second->proposal.header().proposal_id();
- q.push(w_hash);
- }
- if (!p->header().prehash().empty()) {
- q.push(p->header().prehash());
- }
- }
- };
-
- bfs();
- if (commit_p.size() > 1) {
- LOG(ERROR) << "commit more hash";
- }
- int block_num = 0;
- for (int i = commit_p.size() - 1; i >= 0; i--) {
- for (int j = 0; j < commit_p[i].size(); ++j) {
- /*
- if (j == 0) {
- LOG(ERROR) << "commmit proposal lead from:"
- << commit_p[i][j]->header().proposer_id()
- << " height:" << commit_p[i][j]->header().height()
- << " size:" << commit_p[i].size();
- }
- */
- //LOG(ERROR) << "commmit proposal:"
- // << commit_p[i][j]->header().proposer_id()
- // << " height:" << commit_p[i][j]->header().height()
- // << " idx:" << j
- // << " delay:" << (GetCurrentTime() -
commit_p[i][j]->create_time())
- // << " commit from:"<< from_proposer<<"
id:"<<from_proposal_id;
- block_num += commit_p[i][j]->block_size();
- if (commit_callback_) {
- commit_callback_(*commit_p[i][j]);
- }
- }
- }
- global_stats_->AddCommitBlock(block_num);
-
- // TODO clean
- last_node_[it->second->proposal.header().height()].clear();
- latest_commit_ = it->second->proposal;
- it->second->state = ProposalState::Committed;
- // Clear(latest_commit_.header().hash());
-}
-
-std::vector<std::unique_ptr<Proposal>> ProposalGraph::GetNotFound(
- int height, const std::string& hash) {
- auto it = not_found_proposal_.find(height);
- if (it == not_found_proposal_.end()) {
- return std::vector<std::unique_ptr<Proposal>>();
- }
- auto pre_it = it->second.find(hash);
- std::vector<std::unique_ptr<Proposal>> ret;
- if (pre_it != it->second.end()) {
- ret = std::move(pre_it->second);
- it->second.erase(pre_it);
- LOG(ERROR) << "found future height:" << height;
- }
- return ret;
-}
-
-bool ProposalGraph::VerifyParent(const Proposal& proposal) {
- // LOG(ERROR) << "last commit:" << latest_commit_.header().proposal_id()
- // << " current :" << proposal.header().proposal_id()
- // << " height:" << proposal.header().height();
-
- if (proposal.header().prehash() == latest_commit_.header().hash()) {
- return true;
- }
-
- std::string prehash = proposal.header().prehash();
- // LOG(ERROR)<<"prehash:"<<prehash;
-
- auto it = node_info_.find(prehash);
- if (it == node_info_.end()) {
- LOG(ERROR) << "prehash not here";
-
not_found_proposal_[proposal.header().height()][proposal.header().prehash()]
- .push_back(std::make_unique<Proposal>(proposal));
- return false;
- } else {
- if (proposal.header().height() !=
- it->second->proposal.header().height() + 1) {
- LOG(ERROR) << "link to invalid proposal, height:"
- << proposal.header().height()
- << " pre height:" << it->second->proposal.header().height();
- return false;
- }
- }
- return true;
-}
-
-void ProposalGraph::UpdateHistory(Proposal* proposal) {
- proposal->mutable_history()->Clear();
- std::string hash = proposal->header().hash();
-
- for (int i = 0; i < 3 && !hash.empty(); ++i) {
- auto node_it = node_info_.find(hash);
- auto his = proposal->add_history();
- his->set_hash(hash);
- his->set_state(node_it->second->state);
- his->set_sender(node_it->second->proposal.header().proposer_id());
- his->set_id(node_it->second->proposal.header().proposal_id());
- hash = node_it->second->proposal.header().prehash();
- }
-}
-
-Proposal* ProposalGraph::GetStrongestProposal() {
- // LOG(ERROR) << "get strong proposal from height:" << current_height_;
- if (last_node_.find(current_height_) == last_node_.end()) {
- LOG(ERROR) << "no data:" << current_height_;
- return nullptr;
- }
-
- NodeInfo* sp = nullptr;
- for (const auto& last_hash : last_node_[current_height_]) {
- NodeInfo* node_info = node_info_[last_hash].get();
- if (sp == nullptr || Compare(*sp, *node_info)) {
- sp = node_info;
- }
- }
-
- UpdateHistory(&sp->proposal);
- //LOG(ERROR) << "get strong proposal from height:" << current_height_ << "
->("
- // << sp->proposal.header().proposer_id() << ","
- // << sp->proposal.header().proposal_id() << ")";
- return &sp->proposal;
-}
-
-bool ProposalGraph::Cmp(int id1, int id2) {
- // LOG(ERROR) << "commit commit num:" << id1 << " " << id2
- // << " commit time:" << commit_num_[id1] << " " <<
- // commit_num_[id2];
- if (commit_num_[id1] + 1 < commit_num_[id2]) {
- return false;
- }
-
- if (commit_num_[id1] > commit_num_[id2] + 1) {
- return true;
- }
- return id1 < id2;
-}
-
-int ProposalGraph::StateScore(const ProposalState& state) {
- // return state == ProposalState::Prepared? 1:0;
- return state;
-}
-
-int ProposalGraph::CompareState(const ProposalState& state1,
- const ProposalState& state2) {
- // LOG(ERROR) << "check state:" << state1 << " " << state2;
- return StateScore(state1) - StateScore(state2);
-}
-
-// p1 < p2
-bool ProposalGraph::Compare(const NodeInfo& p1, const NodeInfo& p2) {
- // LOG(ERROR) << "proposer:" << p1.proposal.header().proposer_id() << " "
- // << p2.proposal.header().proposer_id()
- // << "height:" << p1.proposal.header().height() << " "
- // << p2.proposal.header().height();
- if (p1.proposal.header().height() != p2.proposal.header().height()) {
- return p1.proposal.header().height() < p2.proposal.header().height();
- }
- // LOG(ERROR)<<"proposer:"<<p1.proposal.header().proposer_id()<<"
- // "<<p2.proposal.header().proposer_id();
- if (CompareState(p1.state, p2.state) != 0) {
- return CompareState(p1.state, p2.state) < 0;
- }
-
- if (p1.proposal.header().proposer_id() ==
- p2.proposal.header().proposer_id()) {
- return p1.proposal.header().proposal_id() <
- p2.proposal.header().proposal_id();
- }
-
- return Cmp(p1.proposal.header().proposer_id(),
- p2.proposal.header().proposer_id());
-}
-
-Proposal* ProposalGraph::GetLatestStrongestProposal() {
- Proposal* sp = GetStrongestProposal();
- if (sp == nullptr) {
- if (current_height_ > 0) {
- assert(1 == 0);
- }
- return &latest_commit_;
- }
- // LOG(ERROR) << "====== get strong proposal from:" <<
- // sp->header().proposer_id()
- // << " id:" << sp->header().proposal_id();
- return sp;
-}
-
-ProposalState ProposalGraph::GetProposalState(const std::string& hash) const {
- auto node_it = node_info_.find(hash);
- if (node_it == node_info_.end()) {
- return ProposalState::None;
- }
- return node_it->second->state;
-}
-
-const Proposal* ProposalGraph::GetProposalInfo(const std::string& hash) const {
- auto it = node_info_.find(hash);
- if (it == node_info_.end()) {
- LOG(ERROR) << "hash not found:" << Encode(hash);
- return nullptr;
- }
- return &it->second->proposal;
-}
-
-int ProposalGraph::GetCurrentHeight() { return current_height_; }
-
-std::vector<Proposal*> ProposalGraph::GetNewProposals(int height) {
- std::vector<Proposal*> ps;
- for (auto it : new_proposals_) {
- if (it.second->header().height() >= height) {
- continue;
- }
- ps.push_back(it.second);
- }
- for (Proposal* p : ps) {
- new_proposals_.erase(new_proposals_.find(p->header().hash()));
- }
- return ps;
-}
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
b/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
deleted file mode 100644
index ee21834c..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_graph.h
+++ /dev/null
@@ -1,87 +0,0 @@
-#pragma once
-
-#include <map>
-
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_state.h"
-#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class ProposalGraph {
- public:
- ProposalGraph(int fault_num);
- inline void SetCommitCallBack(std::function<void(const Proposal&)> func) {
- commit_callback_ = func;
- }
-
- int AddProposal(const Proposal& proposal);
- void AddProposalOnly(const Proposal& proposal);
-
- Proposal* GetLatestStrongestProposal();
- const Proposal* GetProposalInfo(const std::string& hash) const;
-
- int GetCurrentHeight();
-
- void Clear(const std::string& hash);
- void IncreaseHeight();
- ProposalState GetProposalState(const std::string& hash) const;
-
- std::vector<std::unique_ptr<Proposal>> GetNotFound(int height,
- const std::string& hash);
-
- std::vector<Proposal*> GetNewProposals(int height);
-
- private:
- struct NodeInfo {
- Proposal proposal;
- ProposalState state;
- int score;
- int is_main;
- // std::set<int> received_num[5];
- std::map<int, std::set<int>> votes;
-
- NodeInfo(const Proposal& proposal)
- : proposal(proposal), state(ProposalState::New), score(0), is_main(0)
{}
- };
-
- bool VerifyParent(const Proposal& proposal);
-
- bool Compare(const NodeInfo& p1, const NodeInfo& p2);
- bool Cmp(int id1, int id2);
- int StateScore(const ProposalState& state);
- int CompareState(const ProposalState& state1, const ProposalState& state2);
-
- Proposal* GetStrongestProposal();
-
- void UpdateHistory(Proposal* proposal);
- int CheckState(NodeInfo* node_info, ProposalState state);
- void UpgradeState(ProposalState& state);
- void TryUpgradeHeight(int height);
-
- void Commit(const std::string& hash);
-
- private:
- Proposal latest_commit_;
- std::map<std::string, std::vector<std::string>> g_;
- std::map<std::string, std::unique_ptr<NodeInfo>> node_info_;
- std::map<std::string, std::vector<VoteMessage>> not_found_;
- std::unique_ptr<Ranking> ranking_;
- std::map<int, int> commit_num_;
- std::map<int, std::set<std::string>> last_node_;
- int current_height_;
- uint32_t f_;
- std::function<void(const Proposal&)> commit_callback_;
- std::map<int, std::set<int>> pending_header_;
- std::map<int, std::map<std::string, std::vector<std::unique_ptr<Proposal>>>>
- not_found_proposal_;
- std::map<std::string, Proposal*> new_proposals_;
- Stats* global_stats_;
-};
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git
a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
deleted file mode 100644
index bcca39d9..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.cpp
+++ /dev/null
@@ -1,470 +0,0 @@
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_manager.h"
-
-#include <glog/logging.h>
-
-#include "common/crypto/signature_verifier.h"
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-namespace {
-std::string Encode(const std::string& hash) {
- std::string ret;
- for (int i = 0; i < hash.size(); ++i) {
- int x = hash[i];
- ret += std::to_string(x);
- }
- return ret;
-}
-
-}
-
-ProposalManager::ProposalManager(int32_t id, ProposalGraph* graph)
- : id_(id), graph_(graph) {
- local_proposal_id_ = 1;
- global_stats_ = Stats::GetGlobalStats();
-}
-
-int ProposalManager::CurrentRound() { return graph_->GetCurrentHeight(); }
-
-std::unique_ptr<Block> ProposalManager::MakeBlock(
- std::vector<std::unique_ptr<Transaction>>& txns) {
- auto block = std::make_unique<Block>();
- Block::BlockData* data = block->mutable_data();
- for (const auto& txn : txns) {
- *data->add_transaction() = *txn;
- }
-
- std::string data_str;
- data->SerializeToString(&data_str);
- std::string hash = SignatureVerifier::CalculateHash(data_str);
- block->set_hash(hash);
- block->set_sender_id(id_);
- block->set_create_time(GetCurrentTime());
- block->set_local_id(local_block_id_++);
- // LOG(ERROR)<<"make block time:"<<block->create_time();
- return block;
-}
-
-void ProposalManager::AddLocalBlock(std::unique_ptr<Block> block) {
- std::unique_lock<std::mutex> lk(mutex_);
- //LOG(ERROR)<<"add local block :"<<block->local_id();
- blocks_candidates_[block->local_id()] = std::move(block);
-}
-
-void ProposalManager::BlockReady(const std::string& hash, int local_id) {
- std::unique_lock<std::mutex> lk(mutex_);
- //LOG(ERROR)<<"ready block:"<<local_id;
- auto it = blocks_candidates_.find(local_id);
- assert(it != blocks_candidates_.end());
- assert(it->second->hash() == hash);
- blocks_.push_back(std::move(it->second));
- blocks_candidates_.erase(it);
- // Notify();
- notify_.notify_all();
-}
-
-void ProposalManager::AddBlock(std::unique_ptr<Block> block) {
- std::unique_lock<std::mutex> lk(p_mutex_);
- int sender = block->sender_id();
- int block_id = block->local_id();
- //LOG(ERROR)<<"add block from sender:"<<sender<<" id:"<<block_id<<"
blocksize:"<<pending_blocks_[sender].size()<<" hash:"<<Encode(block->hash());
- pending_blocks_[sender][block->hash()] = std::move(block);
-}
-
-bool ProposalManager::ContainBlock(const std::string& hash, int sender) {
- std::unique_lock<std::mutex> lk(p_mutex_);
- auto bit = pending_blocks_[sender].find(hash);
- return bit != pending_blocks_[sender].end();
-}
-
-bool ProposalManager::ContainBlock(const Block& block) {
- std::unique_lock<std::mutex> lk(p_mutex_);
- int sender = block.sender_id();
- const std::string& hash = block.hash();
- auto bit = pending_blocks_[sender].find(hash);
- return bit != pending_blocks_[sender].end();
-}
-
-const Block* ProposalManager::QueryBlock(const std::string& hash) {
- std::unique_lock<std::mutex> lk(p_mutex_);
- auto bit = pending_blocks_[id_].find(hash);
- assert(bit != pending_blocks_[id_].end());
- return bit->second.get();
-}
-
-Block* ProposalManager::GetBlockSnap(const std::string& hash, int sender) {
- std::unique_lock<std::mutex> lk(p_mutex_);
-
- //LOG(ERROR)<<" sender:"<<sender<<" block
size:"<<pending_blocks_[sender].size();
- auto it = pending_blocks_[sender].find(hash);
- if (it == pending_blocks_[sender].end()) {
- LOG(ERROR) << "block from sender:" << sender << " not found"<<" pending
size:"<<pending_blocks_[sender].size();
- return nullptr;
- }
- assert(it != pending_blocks_[sender].end());
- // LOG(ERROR)<<"get block: sender:"<<sender<<" id:"<<block->local_id();
- return it->second.get();
-}
-
-std::unique_ptr<Block> ProposalManager::GetBlock(const std::string& hash,
- int sender) {
- bool need_wait = false;
- while (true) {
- if (need_wait) {
- usleep(1000);
- }
- std::unique_lock<std::mutex> lk(p_mutex_);
- auto it = pending_blocks_[sender].find(hash);
- if (it == pending_blocks_[sender].end()) {
- LOG(ERROR) << "block from sender:" << sender << " not found";
- need_wait = true;
- continue;
- }
- assert(it != pending_blocks_[sender].end());
- auto block = std::move(it->second);
- //LOG(ERROR)<<"get block: sender:"<<sender<<" id:"<<block->local_id()<<"
removed";
- pending_blocks_[sender].erase(it);
- return block;
- }
-}
-
-void ProposalManager::ClearProposal(const Proposal& p) {}
-
-/*
-void ProposalManager::Notify(){
- //std::unique_lock<std::mutex> lk(notify_mutex_);
- notify_.notify_all();
-}
-
-void ProposalManager::Wait(){
- //std::unique_lock<std::mutex> lk(notify_mutex_);
- notify_.wait_for(lk, std::chrono::microseconds(100000),
- [&] { return !blocks_.empty(); });
-}
-*/
-
-bool ProposalManager::WaitBlock() {
- std::unique_lock<std::mutex> lk(mutex_);
- if (blocks_.empty()) {
- notify_.wait_for(lk, std::chrono::microseconds(1000000),
- [&] { return !blocks_.empty(); });
- //LOG(ERROR) << "wait proposal block size:" << blocks_.size();
- }
- return !blocks_.empty();
-}
-
-std::unique_ptr<Proposal> ProposalManager::GenerateProposal(int round,
- bool need_empty) {
- auto proposal = std::make_unique<Proposal>();
- std::string data;
- {
- //LOG(ERROR) << "generate proposal pending block size:" << blocks_.size();
- std::unique_lock<std::mutex> lk(mutex_);
- if (blocks_.empty() && !need_empty) {
- // return nullptr;
- // Wait();
- // notify_.wait_for(lk, std::chrono::microseconds(100000),
- //[&] { return !blocks_.empty(); });
- // if(blocks_.empty()){
- // return nullptr;
- //}
- return nullptr;
- // LOG(ERROR) << "generate wait proposal block size:" << blocks_.size();
- }
- int max_block = 5;
- int num = 0;
- int64_t current_time = GetCurrentTime();
- proposal->set_create_time(current_time);
- for (auto& block : blocks_) {
- data += block->hash();
- Block* ab = proposal->add_block();
- ab->set_hash(block->hash());
- ab->set_sender_id(block->sender_id());
- ab->set_local_id(block->local_id());
- ab->set_create_time(block->create_time());
- //LOG(ERROR) << " add block:" << block->local_id()
- // << " block delay:" << (current_time - block->create_time())
- // << " block size:" << blocks_.size()
- // << " txn:" << block->data().transaction_size();
- max_block--;
- num++;
- // break;
- if (max_block <= 0) break;
- // blocks_.pop_front();
- // break;
- }
- //LOG(ERROR) << "block num:" << num;
- while (num > 0) {
- blocks_.pop_front();
- num--;
- }
- /*
- if(!blocks_.empty()){
- blocks_.pop_front();
- }
- */
- // blocks_.clear();
- }
-
- Proposal* last = graph_->GetLatestStrongestProposal();
- proposal->mutable_header()->set_height(round);
- //LOG(ERROR) << "get last proposal, proposer:" <<
last->header().proposer_id()
- // << " id:" << last->header().proposal_id();
-
- graph_->IncreaseHeight();
- if (last != nullptr) {
- assert(last->header().height() + 1 == round);
-
- proposal->mutable_header()->set_prehash(last->header().hash());
- *proposal->mutable_history() = last->history();
- }
-
- {
- std::vector<Proposal*> ps = graph_->GetNewProposals(round);
- // LOG(ERROR)<<"get weak p from round:"<<round<<" size:"<<ps.size();
- for (Proposal* p : ps) {
- if (p->header().height() >= round) {
- LOG(ERROR) << "round invalid:" << round
- << " header:" << p->header().height();
- }
- assert(p->header().height() < round);
-
- if (p->header().hash() == last->header().hash()) {
- continue;
- }
- // LOG(ERROR)<<"add weak p:"<<p->header().height()<<"
- // proposer:"<<p->header().proposer_id();
- *proposal->mutable_weak_proposals()->add_hash() = p->header().hash();
- data += p->header().hash();
- }
- }
-
- proposal->mutable_header()->set_proposer_id(id_);
- proposal->mutable_header()->set_proposal_id(local_proposal_id_++);
- data += std::to_string(proposal->header().proposal_id()) +
- std::to_string(proposal->header().proposer_id()) +
- std::to_string(proposal->header().height());
-
- std::string hash = SignatureVerifier::CalculateHash(data);
- proposal->mutable_header()->set_hash(hash);
- // proposal->set_create_time(GetCurrentTime());
- return proposal;
-}
-
-void ProposalManager::ObtainHistoryProposal(const Proposal* p,
- std::set<std::pair<int, int>>& v,
- std::vector<const Proposal*>& resp,
- int current_height) {
- // LOG(ERROR)<<"obtain proposal history:"<<p->header().proposer_id()<<"
- // id:"<<p->header().proposal_id();
- const std::string& pre_hash = p->header().prehash();
- if (!pre_hash.empty()) {
- const Proposal* next_p = graph_->GetProposalInfo(pre_hash);
- assert(next_p != nullptr);
- int height = next_p->header().height();
- // LOG(ERROR)<<"Obtain next height:"<<height<<"
- // proposer:"<<next_p->header().proposer_id();
- // LOG(ERROR)<<"ask height:"<<current_height<<" current height:"<<height;
- if (current_height - height > 5) {
- return;
- }
- if (v.find(std::make_pair(next_p->header().proposer_id(),
- next_p->header().proposal_id())) != v.end()) {
- return;
- }
- v.insert(std::make_pair(next_p->header().proposer_id(),
- next_p->header().proposal_id()));
- // LOG(ERROR)<<"Obtain height:"<<height<<"
- // proposer:"<<next_p->header().proposer_id()<<"
- // id:"<<next_p->header().proposal_id();
- ObtainHistoryProposal(next_p, v, resp, current_height);
- resp.push_back(next_p);
- }
-
- for (const std::string& hash : p->weak_proposals().hash()) {
- const Proposal* next_p = graph_->GetProposalInfo(hash);
- assert(next_p != nullptr);
- int height = next_p->header().height();
- // LOG(ERROR)<<"ask height:"<<current_height<<" current height:"<<height;
- if (current_height - height > 5) {
- return;
- }
- if (v.find(std::make_pair(next_p->header().proposer_id(),
- next_p->header().proposal_id())) != v.end()) {
- return;
- }
- v.insert(std::make_pair(next_p->header().proposer_id(),
- next_p->header().proposal_id()));
- // LOG(ERROR)<<"Obtain height:"<<height<<"
- // proposer:"<<next_p->header().proposer_id()<<"
- // id:"<<next_p->header().proposal_id();
- ObtainHistoryProposal(next_p, v, resp, current_height);
- resp.push_back(next_p);
- }
-}
-
-int ProposalManager::VerifyProposalHistory(const Proposal* p) {
- // LOG(ERROR)<<"verify proposal, proposer:"<<p->header().proposer_id()<<"
- // id:"<<p->header().proposal_id();
- int ret = 0;
- const std::string& pre_hash = p->header().prehash();
- if (!pre_hash.empty()) {
- const Proposal* next_p = graph_->GetProposalInfo(pre_hash);
- if (next_p == nullptr) {
- LOG(ERROR) << "no prehash:";
- if (tmp_proposal_.find(pre_hash) == tmp_proposal_.end()) {
- LOG(ERROR) << " prehash not found";
- return 2;
- }
- ret = 1;
- auto& it = tmp_proposal_[pre_hash];
- assert(it != nullptr);
- LOG(ERROR) << "Find pre hash in tmp:" << it->header().proposer_id()
- << " id:" << it->header().proposal_id();
- }
- }
-
- for (const std::string& hash : p->weak_proposals().hash()) {
- const Proposal* next_p = graph_->GetProposalInfo(hash);
- if (next_p != nullptr) {
- continue;
- }
- if (tmp_proposal_.find(hash) == tmp_proposal_.end()) {
- LOG(ERROR) << "weak not found";
- return 3;
- }
- ret = 1;
- auto& it = tmp_proposal_[hash];
- assert(it != nullptr);
- LOG(ERROR) << "Find weak in tmp:" << it->header().proposer_id()
- << " id:" << it->header().proposal_id();
- }
- return ret;
-}
-
-std::unique_ptr<ProposalQueryResp> ProposalManager::QueryProposal(
- const std::string& hash) {
- const Proposal* p = graph_->GetProposalInfo(hash);
- if (p == nullptr) {
- p = GetLocalProposal(hash);
- LOG(ERROR) << "get from local:" << (p == nullptr);
- }
- assert(p != nullptr);
- LOG(ERROR) << "query proposal id:" << p->header().proposal_id();
- int current_height = p->header().height();
-
- std::set<std::pair<int, int>> v;
- std::vector<const Proposal*> list;
-
- {
- std::unique_lock<std::mutex> lk(q_mutex_);
- ObtainHistoryProposal(p, v, list, current_height);
- }
-
- std::unique_ptr<ProposalQueryResp> resp =
- std::make_unique<ProposalQueryResp>();
- for (const Proposal* p : list) {
- *resp->add_proposal() = *p;
- }
- return resp;
-}
-
-int ProposalManager::VerifyProposal(const Proposal& proposal) {
- std::unique_lock<std::mutex> lk(q_mutex_);
- int ret = VerifyProposalHistory(&proposal);
- if (ret != 0) {
- LOG(ERROR) << "add to temp proposer:" << proposal.header().proposer_id()
- << " id:" << proposal.header().proposal_id();
- tmp_proposal_[proposal.header().hash()] =
- std::make_unique<Proposal>(proposal);
- }
- return ret;
-}
-
-void ProposalManager::ReleaseTmpProposal(const Proposal& proposal) {
- std::unique_lock<std::mutex> lk(q_mutex_);
- auto it = tmp_proposal_.find(proposal.header().hash());
- if (it != tmp_proposal_.end()) {
- //LOG(ERROR) << "release proposal:" << proposal.header().proposer_id()
- // << " id:" << proposal.header().proposal_id();
- tmp_proposal_.erase(it);
- }
- graph_->AddProposalOnly(proposal);
-}
-
-void ProposalManager::AddLocalProposal(const Proposal& proposal) {
- std::unique_lock<std::mutex> lk(t_mutex_);
- local_proposal_[proposal.header().hash()] =
- std::make_unique<Proposal>(proposal);
- //LOG(ERROR) << "add local id:" << proposal.header().proposal_id();
-}
-
-Proposal* ProposalManager::GetLocalProposal(const std::string& hash) {
- std::unique_lock<std::mutex> lk(t_mutex_);
- auto it = local_proposal_.find(hash);
- if (it == local_proposal_.end()) return nullptr;
- return it->second.get();
-}
-
-void ProposalManager::RemoveLocalProposal(const std::string& hash) {
- std::unique_lock<std::mutex> lk(t_mutex_);
- auto it = local_proposal_.find(hash);
- if (it == local_proposal_.end()) return;
- //LOG(ERROR) << "remove local id:" << it->second->header().proposal_id();
- local_proposal_.erase(it);
-}
-
-int ProposalManager::VerifyProposal(const ProposalQueryResp& resp) {
- std::map<std::pair<int, int>, std::unique_ptr<Proposal>> list;
- //LOG(ERROR) << "verify resp proposal size:" << resp.proposal_size();
- for (auto& it : tmp_proposal_) {
- std::unique_ptr<Proposal> p = std::move(it.second);
- list[std::make_pair(p->header().height(), p->header().proposer_id())] =
- std::move(p);
- }
-
- tmp_proposal_.clear();
-
- for (const Proposal& p : resp.proposal()) {
- // LOG(ERROR)<<"verify resp proposal
proposer:"<<p.header().proposer_id()<<"
- // id:"<<p.header().proposal_id();
- if (list.find(std::make_pair(p.header().height(),
- p.header().proposer_id())) != list.end()) {
- continue;
- }
- list[std::make_pair(p.header().height(), p.header().proposer_id())] =
- std::make_unique<Proposal>(p);
- }
-
- std::vector<std::unique_ptr<Proposal>> fail_list;
- while (!list.empty()) {
- auto it = list.begin();
- int ret = VerifyProposalHistory(it->second.get());
- // LOG(ERROR)<<"verify propser:"<<it->second->header().proposer_id()<<"
- // height:"<<it->second->header().height()<<" ret:"<<ret;
- if (ret == 0) {
- ReleaseTmpProposal(*it->second);
- // LOG(ERROR)<<"proposal from:"<<it->second->header().proposer_id()
- // <<" id:"<<it->second->header().proposal_id()<<" activate";
- } else {
- fail_list.push_back(std::move(it->second));
- }
- list.erase(it);
- // assert(ret==0);
- }
- // tmp_proposal_.clear();
- for (auto& p : fail_list) {
- LOG(ERROR) << "proposal from:" << p->header().proposer_id()
- << " id:" << p->header().proposal_id() << " fail";
- assert(p != nullptr);
- tmp_proposal_[p->header().hash()] = std::move(p);
- }
- return 0;
-}
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
b/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
deleted file mode 100644
index 4a937470..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_manager.h
+++ /dev/null
@@ -1,73 +0,0 @@
-#pragma once
-
-#include <condition_variable>
-#include <list>
-
-#include "platform/consensus/ordering/cassandra/algorithm/proposal_graph.h"
-#include "platform/consensus/ordering/cassandra/proto/proposal.pb.h"
-#include "platform/statistic/stats.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class ProposalManager {
- public:
- ProposalManager(int32_t id, ProposalGraph* graph);
-
- int VerifyProposal(const Proposal& proposal);
-
- void AddLocalBlock(std::unique_ptr<Block> block);
- void AddBlock(std::unique_ptr<Block> block);
- std::unique_ptr<Block> MakeBlock(
- std::vector<std::unique_ptr<Transaction>>& txn);
-
- std::unique_ptr<Proposal> GenerateProposal(int round, bool need_empty);
- int CurrentRound();
-
- void ClearProposal(const Proposal& p);
- std::unique_ptr<Block> GetBlock(const std::string& hash, int sender);
- Block* GetBlockSnap(const std::string& hash, int sender);
-
- bool ContainBlock(const std::string& hash, int sender);
- bool ContainBlock(const Block& block);
- bool WaitBlock();
- void BlockReady(const std::string& hash, int local_id);
- const Block* QueryBlock(const std::string& hash);
- std::unique_ptr<ProposalQueryResp> QueryProposal(const std::string& hash);
- bool VerifyProposal(const Proposal* proposal);
- void AddTmpProposal(std::unique_ptr<Proposal> proposal);
- void ReleaseTmpProposal(const Proposal& proposal);
- int VerifyProposalHistory(const Proposal* p);
- void AddLocalProposal(const Proposal& proposal);
- void RemoveLocalProposal(const std::string& hash);
-
- int VerifyProposal(const ProposalQueryResp& resp);
-
- private:
- void ObtainHistoryProposal(const Proposal* p,
- std::set<std::pair<int, int>>& v,
- std::vector<const Proposal*>& resp,
- int current_height);
- Proposal* GetLocalProposal(const std::string& hash);
-
- private:
- int32_t id_;
- ProposalGraph* graph_;
- int64_t local_proposal_id_ = 1, local_block_id_ = 1;
-
- std::map<std::string, std::unique_ptr<Block>> pending_blocks_[512];
- std::list<std::unique_ptr<Block>> blocks_;
- std::mutex mutex_, p_mutex_, q_mutex_;
- std::condition_variable notify_;
- std::map<int, std::unique_ptr<Block>> blocks_candidates_;
- std::map<std::string, std::unique_ptr<Proposal>> tmp_proposal_;
-
- std::mutex t_mutex_;
- std::map<std::string, std::unique_ptr<Proposal>> local_proposal_;
- Stats* global_stats_;
-};
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
b/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
deleted file mode 100644
index b25bc293..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/proposal_state.h
+++ /dev/null
@@ -1,16 +0,0 @@
-#pragma once
-
-namespace resdb {
-namespace cassandra {
-
-enum ProposalState {
- None = 0,
- New = 1,
- Voted = 2,
- Prepared = 3,
- PreCommit = 4,
- Committed = 5,
-};
-
-}
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/ranking.cpp
b/platform/consensus/ordering/cassandra/algorithm/ranking.cpp
deleted file mode 100644
index d8454e43..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/ranking.cpp
+++ /dev/null
@@ -1,12 +0,0 @@
-
-#include "platform/consensus/ordering/cassandra/algorithm/ranking.h"
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-int Ranking::GetRank(int proposer_id) { return proposer_id; }
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/algorithm/ranking.h
b/platform/consensus/ordering/cassandra/algorithm/ranking.h
deleted file mode 100644
index d3d2dfb6..00000000
--- a/platform/consensus/ordering/cassandra/algorithm/ranking.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-namespace resdb {
-namespace cassandra {
-namespace cassandra_recv {
-
-class Ranking {
- public:
- int GetRank(int proposer_id);
-};
-
-} // namespace cassandra_recv
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/BUILD
b/platform/consensus/ordering/cassandra/framework/BUILD
deleted file mode 100644
index a764ecff..00000000
--- a/platform/consensus/ordering/cassandra/framework/BUILD
+++ /dev/null
@@ -1,16 +0,0 @@
-package(default_visibility = ["//visibility:private"])
-
-cc_library(
- name = "consensus",
- srcs = ["consensus.cpp"],
- hdrs = ["consensus.h"],
- visibility = [
- "//visibility:public",
- ],
- deps = [
- "//common/utils",
- "//platform/consensus/ordering/common/framework:consensus",
- "//platform/consensus/ordering/cassandra/algorithm:cassandra",
- ],
-)
-
diff --git a/platform/consensus/ordering/cassandra/framework/consensus.cpp
b/platform/consensus/ordering/cassandra/framework/consensus.cpp
deleted file mode 100644
index f8ad64fb..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus.cpp
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#include "platform/consensus/ordering/cassandra/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include "common/utils/utils.h"
-
-namespace resdb {
-namespace cassandra {
-
-Consensus::Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> executor)
- : common::Consensus(config, std::move(executor)){
- int total_replicas = config_.GetReplicaNum();
- int f = (total_replicas - 1) / 3;
-
- Init();
-
- start_ = 0;
-
- if (config_.GetPublicKeyCertificateInfo()
- .public_key()
- .public_key_info()
- .type() != CertificateKeyInfo::CLIENT) {
- cassandra_ = std::make_unique<cassandra_recv::Cassandra>(
- config_.GetSelfInfo().id(), f,
- total_replicas, GetSignatureVerifier());
-
- InitProtocol(cassandra_.get());
-
-
- cassandra_->SetPrepareFunction([&](const Transaction& msg) {
- return Prepare(msg);
- });
- }
-}
-
-int Consensus::ProcessCustomConsensus(std::unique_ptr<Request> request) {
- //LOG(ERROR)<<"receive commit:"<<request->type()<<"
"<<MessageType_Name(request->user_type());
- if (request->user_type() == MessageType::NewBlocks) {
- std::unique_ptr<Block> block = std::make_unique<Block>();
- if (!block->ParseFromString(request->data())) {
- assert(1 == 0);
- LOG(ERROR) << "parse proposal fail";
- return -1;
- }
- cassandra_->ReceiveBlock(std::move(block));
- return 0;
- } else if (request->user_type() == MessageType::CMD_BlockACK) {
- std::unique_ptr<BlockACK> block_ack = std::make_unique<BlockACK>();
- if (!block_ack->ParseFromString(request->data())) {
- LOG(ERROR) << "parse proposal fail";
- assert(1 == 0);
- return -1;
- }
- cassandra_->ReceiveBlockACK(std::move(block_ack));
- return 0;
-
- } else if (request->user_type() == MessageType::NewProposal) {
- // LOG(ERROR)<<"receive proposal:";
- std::unique_ptr<Proposal> proposal = std::make_unique<Proposal>();
- if (!proposal->ParseFromString(request->data())) {
- LOG(ERROR) << "parse proposal fail";
- assert(1 == 0);
- return -1;
- }
- if (!cassandra_->ReceiveProposal(std::move(proposal))) {
- return -1;
- }
- return 0;
- } else if (request->user_type() == MessageType::CMD_BlockQuery) {
- std::unique_ptr<BlockQuery> block = std::make_unique<BlockQuery>();
- if (!block->ParseFromString(request->data())) {
- assert(1 == 0);
- LOG(ERROR) << "parse proposal fail";
- return -1;
- }
- cassandra_->SendBlock(*block);
- return 0;
- } else if (request->user_type() == MessageType::CMD_ProposalQuery) {
- std::unique_ptr<ProposalQuery> query =
- std::make_unique<ProposalQuery>();
- if (!query->ParseFromString(request->data())) {
- assert(1 == 0);
- LOG(ERROR) << "parse proposal fail";
- return -1;
- }
- cassandra_->SendProposal(*query);
- } else if (request->user_type() ==
- MessageType::CMD_ProposalQueryResponse) {
- std::unique_ptr<ProposalQueryResp> resp =
- std::make_unique<ProposalQueryResp>();
- if (!resp->ParseFromString(request->data())) {
- assert(1 == 0);
- LOG(ERROR) << "parse proposal fail";
- return -1;
- }
- cassandra_->ReceiveProposalQueryResp(*resp);
- }
- return 0;
-}
-
-int Consensus::ProcessNewTransaction(std::unique_ptr<Request> request) {
- std::unique_ptr<Transaction> txn = std::make_unique<Transaction>();
- txn->set_data(request->data());
- txn->set_hash(request->hash());
- txn->set_proxy_id(request->proxy_id());
- //LOG(ERROR)<<"receive txn";
- return cassandra_->ReceiveTransaction(std::move(txn));
-}
-
-int Consensus::CommitMsg(const google::protobuf::Message& msg) {
- return CommitMsgInternal(dynamic_cast<const Transaction&>(msg));
-}
-
-int Consensus::CommitMsgInternal(const Transaction& txn) {
- //LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
uid:"<<txn.uid();
- std::unique_ptr<Request> request = std::make_unique<Request>();
- request->set_queuing_time(txn.queuing_time());
- request->set_data(txn.data());
- request->set_seq(txn.id());
- request->set_uid(txn.uid());
- //if (txn.proposer_id() == config_.GetSelfInfo().id()) {
- request->set_proxy_id(txn.proxy_id());
- // LOG(ERROR)<<"commit txn:"<<txn.id()<<" proxy id:"<<request->uid();
- assert(request->uid()>0);
- //}
-
- transaction_executor_->AddExecuteMessage(std::move(request));
- return 0;
-}
-
-
-int Consensus::Prepare(const Transaction& txn) {
- // LOG(ERROR)<<"prepare txn:"<<txn.id()<<" proxy id:"<<txn.proxy_id()<<"
- // uid:"<<txn.uid();
- std::unique_ptr<Request> request = std::make_unique<Request>();
- request->set_data(txn.data());
- request->set_uid(txn.uid());
- transaction_executor_->Prepare(std::move(request));
- return 0;
-}
-
-
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/consensus.h
b/platform/consensus/ordering/cassandra/framework/consensus.h
deleted file mode 100644
index a0069c9d..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#pragma once
-
-#include "executor/common/transaction_manager.h"
-#include "platform/consensus/ordering/common/framework/consensus.h"
-#include "platform/consensus/ordering/cassandra/algorithm/cassandra.h"
-#include "platform/networkstrate/consensus_manager.h"
-
-namespace resdb {
-namespace cassandra {
-
-class Consensus : public common::Consensus {
- public:
- Consensus(const ResDBConfig& config,
- std::unique_ptr<TransactionManager> transaction_manager);
- virtual ~Consensus() = default;
-
- private:
- int ProcessCustomConsensus(std::unique_ptr<Request> request) override;
- int ProcessNewTransaction(std::unique_ptr<Request> request) override;
- int CommitMsg(const google::protobuf::Message& msg) override;
- int CommitMsgInternal(const Transaction& txn);
-
- int Prepare(const Transaction& txn);
-
- protected:
- std::unique_ptr<cassandra_recv::Cassandra> cassandra_;
- Stats* global_stats_;
- int64_t start_;
- std::mutex mutex_;
- int send_num_[200];
-};
-
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/framework/consensus_test.cpp
b/platform/consensus/ordering/cassandra/framework/consensus_test.cpp
deleted file mode 100644
index 2c8834a8..00000000
--- a/platform/consensus/ordering/cassandra/framework/consensus_test.cpp
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (c) 2019-2022 ExpoLab, UC Davis
- *
- * Permission is hereby granted, free of charge, to any person
- * obtaining a copy of this software and associated documentation
- * files (the "Software"), to deal in the Software without
- * restriction, including without limitation the rights to use,
- * copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be
- * included in all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
- * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
- * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- *
- */
-
-#include "platform/consensus/ordering/cassandra/framework/consensus.h"
-
-#include <glog/logging.h>
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
-
-#include <future>
-
-#include "common/test/test_macros.h"
-#include "executor/common/mock_transaction_manager.h"
-#include "platform/config/resdb_config_utils.h"
-#include "platform/networkstrate/mock_replica_communicator.h"
-
-namespace resdb {
-namespace cassandra {
-namespace {
-
-using ::resdb::testing::EqualsProto;
-using ::testing::_;
-using ::testing::Invoke;
-using ::testing::Test;
-
-ResDBConfig GetConfig() {
- ResDBConfig config({GenerateReplicaInfo(1, "127.0.0.1", 1234),
- GenerateReplicaInfo(2, "127.0.0.1", 1235),
- GenerateReplicaInfo(3, "127.0.0.1", 1236),
- GenerateReplicaInfo(4, "127.0.0.1", 1237)},
- GenerateReplicaInfo(1, "127.0.0.1", 1234));
- return config;
-}
-
-class ConsensusTest : public Test {
- public:
- ConsensusTest() : config_(GetConfig()) {
- auto transaction_manager =
- std::make_unique<MockTransactionExecutorDataImpl>();
- mock_transaction_manager_ = transaction_manager.get();
- consensus_ =
- std::make_unique<Consensus>(config_, std::move(transaction_manager));
- consensus_->SetCommunicator(&replica_communicator_);
- }
-
- void AddTransaction(const std::string& data) {
- auto request = std::make_unique<Request>();
- request->set_type(Request::TYPE_NEW_TXNS);
-
- Transaction txn;
-
- BatchUserRequest batch_request;
- auto req = batch_request.add_user_requests();
- req->mutable_request()->set_data(data);
-
- batch_request.set_local_id(1);
- batch_request.SerializeToString(txn.mutable_data());
-
- txn.SerializeToString(request->mutable_data());
-
- EXPECT_EQ(consensus_->ConsensusCommit(nullptr, std::move(request)), 0);
- }
-
- protected:
- ResDBConfig config_;
- MockTransactionExecutorDataImpl* mock_transaction_manager_;
- MockReplicaCommunicator replica_communicator_;
- std::unique_ptr<TransactionManager> transaction_manager_;
- std::unique_ptr<Consensus> consensus_;
-};
-
-TEST_F(ConsensusTest, NormalCase) {
- std::promise<bool> commit_done;
- std::future<bool> commit_done_future = commit_done.get_future();
-
- EXPECT_CALL(replica_communicator_, BroadCast)
- .WillRepeatedly(Invoke([&](const google::protobuf::Message& msg) {
- Request request = *dynamic_cast<const Request*>(&msg);
-
- if (request.user_type() == MessageType::NewProposal) {
- LOG(ERROR) << "bc new proposal";
- consensus_->ConsensusCommit(nullptr,
- std::make_unique<Request>(request));
- LOG(ERROR) << "recv proposal done";
- }
- if (request.user_type() == MessageType::Vote) {
- LOG(ERROR) << "bc vote";
-
- VoteMessage ack_msg;
- assert(ack_msg.ParseFromString(request.data()));
- for (int i = 1; i <= 3; ++i) {
- ack_msg.set_proposer_id(i);
- auto new_req = std::make_unique<Request>(request);
- ack_msg.SerializeToString(new_req->mutable_data());
-
- consensus_->ConsensusCommit(nullptr, std::move(new_req));
- }
- }
- // LOG(ERROR)<<"bc type:"<<request->type()<<" user
- // type:"<<request->user_type();
- if (request.user_type() == MessageType::Prepare) {
- LOG(ERROR) << "bc prepare";
-
- VoteMessage ack_msg;
- assert(ack_msg.ParseFromString(request.data()));
- for (int i = 1; i <= 3; ++i) {
- ack_msg.set_proposer_id(i);
- auto new_req = std::make_unique<Request>(request);
- ack_msg.SerializeToString(new_req->mutable_data());
-
- consensus_->ConsensusCommit(nullptr, std::move(new_req));
- }
- }
- if (request.user_type() == MessageType::Voteprep) {
- LOG(ERROR) << "bc voterep:";
-
- VoteMessage ack_msg;
- assert(ack_msg.ParseFromString(request.data()));
- for (int i = 1; i <= 3; ++i) {
- ack_msg.set_proposer_id(i);
- auto new_req = std::make_unique<Request>(request);
- ack_msg.SerializeToString(new_req->mutable_data());
- LOG(ERROR) << "new request type:" << new_req->user_type();
-
- consensus_->ConsensusCommit(nullptr, std::move(new_req));
- }
- }
- LOG(ERROR) << "done";
- return 0;
- }));
-
- EXPECT_CALL(*mock_transaction_manager_, ExecuteData)
- .WillOnce(Invoke([&](const std::string& msg) {
- LOG(ERROR) << "execute txn:" << msg;
- EXPECT_EQ(msg, "transaction1");
- return nullptr;
- }));
-
- EXPECT_CALL(replica_communicator_, SendMessage(_, 0))
- .WillRepeatedly(
- Invoke([&](const google::protobuf::Message& msg, int64_t) {
- Request request = *dynamic_cast<const Request*>(&msg);
- if (request.type() == Request::TYPE_RESPONSE) {
- LOG(ERROR) << "get response";
- commit_done.set_value(true);
- }
- return;
- }));
-
- AddTransaction("transaction1");
-
- commit_done_future.get();
-}
-
-} // namespace
-} // namespace cassandra
-} // namespace resdb
diff --git a/platform/consensus/ordering/cassandra/proto/BUILD
b/platform/consensus/ordering/cassandra/proto/BUILD
deleted file mode 100644
index 558db374..00000000
--- a/platform/consensus/ordering/cassandra/proto/BUILD
+++ /dev/null
@@ -1,16 +0,0 @@
-package(default_visibility =
["//platform/consensus/ordering/cassandra:__subpackages__"])
-
-load("@rules_cc//cc:defs.bzl", "cc_proto_library")
-load("@rules_proto//proto:defs.bzl", "proto_library")
-load("@rules_proto_grpc//python:defs.bzl", "python_proto_library")
-
-proto_library(
- name = "proposal_proto",
- srcs = ["proposal.proto"],
- #visibility = ["//visibility:public"],
-)
-
-cc_proto_library(
- name = "proposal_cc_proto",
- deps = [":proposal_proto"],
-)
diff --git a/platform/consensus/ordering/cassandra/proto/proposal.proto
b/platform/consensus/ordering/cassandra/proto/proposal.proto
deleted file mode 100644
index 1fe96ba1..00000000
--- a/platform/consensus/ordering/cassandra/proto/proposal.proto
+++ /dev/null
@@ -1,119 +0,0 @@
-
-syntax = "proto3";
-
-package resdb.cassandra;
-
-message Transaction{
- int32 id = 1;
- bytes data = 2;
- bytes hash = 3;
- int32 proxy_id = 4;
- int32 proposer_id = 5;
- int64 uid = 6;
- int64 create_time = 7;
- int64 queuing_time = 8;
-}
-
-message Header {
- bytes hash = 1;
- int32 height = 2;
- int32 proposer_id = 3;
- int32 proposal_id = 4;
- bytes prehash = 5;
-}
-
-message History {
- bytes hash = 1;
- int32 state = 2;
- int32 sender = 3;
- int32 id = 4;
-}
-
-message WeakProposal {
- repeated bytes hash = 1;
-}
-
-message Proposal {
- Header header = 1;
- repeated History history = 2;
- int32 nonce = 3;
- repeated Transaction transactions = 4;
- uint64 create_time = 5;
- repeated Block block = 6;
- WeakProposal weak_proposals = 7;
-};
-
-enum MessageType {
- NewProposal = 0;
- Vote = 1;
- Prepare = 2;
- VoteAck = 3;
- Commit = 4;
- Recovery = 5;
- NewBlocks = 6;
- ProposalAck = 7;
- CMD_BlockACK = 8;
- CMD_BlockQuery = 9;
- CMD_SingleBlock = 10;
- CMD_ProposalQuery = 11;
- CMD_ProposalQueryResponse = 12;
-};
-
-message VoteMessage {
- bytes hash = 1;
- int32 proposer_id = 2;
- MessageType type = 3;
- int32 sender_id = 4;
- int32 proposal_id = 5;
-};
-
-message CommittedProposals{
- repeated Proposal proposals = 1;
- int32 sender_id = 2;
-};
-
-message HashValue{
- repeated uint64 bits = 1;
-};
-
-message Block {
- message BlockData {
- repeated Transaction transaction = 1;
- }
- BlockData data = 1;
- bytes hash = 2;
- int32 sender_id = 3;
- int32 local_id = 4;
- int64 create_time = 5;
-}
-
-message BlockACK {
- bytes hash = 2;
- int32 sender_id = 3;
- int32 local_id = 4;
- int32 responder = 5;
-}
-
-message BlockQuery {
- bytes hash = 2;
- int32 proposer = 3;
- int32 local_id = 4;
- int32 sender = 5;
-}
-
-message ProposalQuery {
- bytes hash = 2;
- int32 proposer = 3;
- int32 id = 4;
- int32 sender = 5;
-}
-
-message ProposalQueryResp {
- repeated Proposal proposal = 1;
-}
-
-message ProposalResponse {
- Header header = 1;
- int32 leader = 2;
-}
-