This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/smart_contract_merge by this
push:
new e664bc35 merge master
e664bc35 is described below
commit e664bc35bdeb03467916047fd6fcb2dcd5dc14d6
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 14 02:58:26 2025 +0800
merge master
---
platform/config/resdb_config.cpp | 7 +-
platform/config/resdb_config.h | 14 +-
.../consensus/execution/transaction_executor.cpp | 394 ++-------------------
.../consensus/execution/transaction_executor.h | 51 +--
4 files changed, 39 insertions(+), 427 deletions(-)
diff --git a/platform/config/resdb_config.cpp b/platform/config/resdb_config.cpp
index 08c011ab..4c3ff954 100644
--- a/platform/config/resdb_config.cpp
+++ b/platform/config/resdb_config.cpp
@@ -62,7 +62,7 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
config_data_.set_view_change_timeout_ms(viewchange_commit_timeout_ms_);
}
if (config_data_.client_batch_num() == 0) {
- config_data_.set_client_batch_num(100);
+ config_data_.set_client_batch_num(client_batch_num_);
}
if (config_data_.worker_num() == 0) {
config_data_.set_worker_num(worker_num_);
@@ -76,9 +76,6 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
if (config_data_.tcp_batch_num() == 0) {
config_data_.set_tcp_batch_num(100);
}
- if (config_data_.max_process_txn() == 0) {
- config_data_.set_max_process_txn(64);
- }
}
void ResDBConfig::SetConfigData(const ResConfigData& config_data) {
@@ -180,7 +177,7 @@ void ResDBConfig::SetSignatureVerifierEnabled(bool
enable_sv) {
}
// Performance setting
-bool ResDBConfig::IsPerformanceRunning() const {
+bool ResDBConfig::IsPerformanceRunning() {
return is_performance_running_ || GetConfigData().is_performance_running();
}
diff --git a/platform/config/resdb_config.h b/platform/config/resdb_config.h
index 0284ae9a..fb56a9dd 100644
--- a/platform/config/resdb_config.h
+++ b/platform/config/resdb_config.h
@@ -94,7 +94,7 @@ class ResDBConfig {
void SetSignatureVerifierEnabled(bool enable_sv);
// Performance setting
- bool IsPerformanceRunning() const;
+ bool IsPerformanceRunning();
void RunningPerformance(bool);
bool IsTestMode() const;
@@ -135,17 +135,15 @@ class ResDBConfig {
bool signature_verifier_enabled_ = true;
bool is_performance_running_ = false;
bool is_test_mode_ = false;
+ uint32_t max_process_txn_ = 2048;
uint32_t client_batch_wait_time_ms_ = 100; // milliseconds, 0.1s
+ uint32_t client_batch_num_ = 100;
uint64_t viewchange_commit_timeout_ms_ =
60000; // default 60s to change viewchange
-
- // This is the default settings.
- // change these parameters in the configuration.
- uint32_t max_process_txn_ = 64;
- uint32_t worker_num_ = 16;
- uint32_t input_worker_num_ = 5;
- uint32_t output_worker_num_ = 5;
+ uint32_t worker_num_ = 64;
+ uint32_t input_worker_num_ = 1;
+ uint32_t output_worker_num_ = 1;
};
} // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index 1f98babd..fd24da3a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,7 +20,6 @@
#include "platform/consensus/execution/transaction_executor.h"
#include <glog/logging.h>
-#include "common/utils/utils.h"
namespace resdb {
@@ -36,86 +35,30 @@ TransactionExecutor::TransactionExecutor(
execute_queue_("execute"),
stop_(false),
duplicate_manager_(nullptr) {
-
- memset(blucket_, 0, sizeof(blucket_));
global_stats_ = Stats::GetGlobalStats();
ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
- for (int i = 0; i < execute_thread_num_; ++i) {
- execute_thread_.push_back(
- std::thread(&TransactionExecutor::ExecuteMessage, this));
- }
-
- for (int i = 0; i < 1; ++i) {
- prepare_thread_.push_back(
- std::thread(&TransactionExecutor::PrepareMessage, this));
- }
+ execute_thread_ = std::thread(&TransactionExecutor::ExecuteMessage, this);
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
execute_OOO_thread_ =
std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
LOG(ERROR) << " is out of order:" << transaction_manager_->IsOutOfOrder();
}
- gc_thread_ = std::thread(&TransactionExecutor::GCProcess, this);
}
TransactionExecutor::~TransactionExecutor() { Stop(); }
-void TransactionExecutor::RegisterExecute(int64_t seq) {
- if (execute_thread_num_ == 1) return;
- int idx = seq % blucket_num_;
- std::unique_lock<std::mutex> lk(mutex_);
- // LOG(ERROR)<<"register seq:"<<seq<<" bluck:"<<blucket_[idx];
- assert(!blucket_[idx] || !(blucket_[idx] ^ 3));
- blucket_[idx] = 1;
- // LOG(ERROR)<<"register seq:"<<seq;
-}
-
-void TransactionExecutor::WaitForExecute(int64_t seq) {
- if (execute_thread_num_ == 1) return;
- int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
-
- while (!IsStop()) {
- std::unique_lock<std::mutex> lk(mutex_);
- cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
- return ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]);
- });
- if ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]) {
- break;
- }
- }
- // LOG(ERROR)<<"wait for :"<<seq<<" done";
-}
-
-void TransactionExecutor::FinishExecute(int64_t seq) {
- if (execute_thread_num_ == 1) return;
- int idx = seq % blucket_num_;
- std::unique_lock<std::mutex> lk(mutex_);
- // LOG(ERROR)<<"finish :"<<seq<<" done";
- blucket_[idx] = 3;
- cv_.notify_all();
-}
-
void TransactionExecutor::Stop() {
stop_ = true;
if (ordering_thread_.joinable()) {
ordering_thread_.join();
}
- for (auto& th : execute_thread_) {
- if (th.joinable()) {
- th.join();
- }
- }
- for (auto& th : prepare_thread_) {
- if (th.joinable()) {
- th.join();
- }
+ if (execute_thread_.joinable()) {
+ execute_thread_.join();
}
if (execute_OOO_thread_.joinable()) {
execute_OOO_thread_.join();
}
- if (gc_thread_.joinable()) {
- gc_thread_.join();
- }
}
Storage* TransactionExecutor::GetStorage() {
@@ -201,12 +144,6 @@ void TransactionExecutor::OrderMessage() {
return;
}
-void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
- global_stats_->IncCommit();
- message->set_commit_time(GetCurrentTime());
- execute_queue_.Push(std::move(message));
-}
-
void TransactionExecutor::ExecuteMessage() {
while (!IsStop()) {
auto message = execute_queue_.Pop();
@@ -217,11 +154,7 @@ void TransactionExecutor::ExecuteMessage() {
if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
need_execute = false;
}
- int64_t start_time = GetCurrentTime();
- global_stats_->AddExecuteQueuingLatency(start_time -
message->commit_time());
Execute(std::move(message), need_execute);
- int64_t end_time = GetCurrentTime();
- global_stats_->AddExecuteLatency(end_time-start_time);
}
}
@@ -251,7 +184,10 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
// LOG(INFO) << " get request batch size:"
// << batch_request.user_requests_size()<<" proxy
// id:"<<request->proxy_id();
+ // std::unique_ptr<BatchUserResponse> batch_response =
+ // std::make_unique<BatchUserResponse>();
std::unique_ptr<BatchUserResponse> response;
+ global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
response = transaction_manager_->ExecuteBatch(batch_request);
}
@@ -262,321 +198,51 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
void TransactionExecutor::Execute(std::unique_ptr<Request> request,
bool need_execute) {
- uint64_t uid = request->uid();
- int64_t seq = request->seq();
- RegisterExecute(request->seq());
- std::unique_ptr<BatchUserRequest> batch_request = nullptr;
- std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
data;
- std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
- BatchUserRequest* batch_request_p = nullptr;
-
- bool need_gc = false;
-
- if (request->uid() > 0) {
- bool current_f = SetFlag(uid, Start_Execute);
- if (!current_f) {
- global_stats_->ConsumeTransactions(1);
- std::unique_ptr<std::future<int>> data_f = GetFuture(uid);
- //LOG(ERROR)<<"wait prepare:"<<uid;
- {
- data_f->get();
- }
- //LOG(ERROR)<<"wait prepare done:"<<uid;
-
- // prepare is done
- //LOG(ERROR)<<"prepare is done:"<<uid;
- {
- int64_t start_time = GetCurrentTime();
- std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
- if(req_[uid % mod][uid] == nullptr){
- LOG(ERROR)<<"data is empty:"<<uid;
- }
- assert(req_[uid % mod][uid] != nullptr);
- //batch_request = std::move(req_[uid % mod][uid]);
- batch_request_p = req_[uid % mod][uid].get();
- auto it = data_[uid % mod].find(uid);
- if (it != data_[uid % mod].end()) {
- assert(it->second!=nullptr);
- data_p = it->second.get();
- //data = std::move(it->second);
- }
- int64_t end_time = GetCurrentTime();
- if (end_time - start_time > 1000) {
- LOG(ERROR) << "get data done:" << uid
- << " wait time:" << (end_time - start_time);
- }
- }
- ClearPromise(uid);
- need_gc = true;
- } else {
- global_stats_->AddNewTransactions(1);
- //LOG(ERROR)<<"commit start:"<<uid;
- // LOG(ERROR)<<" prepare not start:"<<uid;
- }
- }
-
// Execute the request, then send the response back to the user.
- if (batch_request_p == nullptr) {
- batch_request = std::make_unique<BatchUserRequest>();
- if (!batch_request->ParseFromString(request->data())) {
- LOG(ERROR) << "parse data fail";
- }
- batch_request->set_hash(request->hash());
- if (request->has_committed_certs()) {
- *batch_request->mutable_committed_certs() = request->committed_certs();
- }
- batch_request->set_seq(request->seq());
- batch_request->set_proxy_id(request->proxy_id());
- batch_request_p = batch_request.get();
- // LOG(ERROR)<<"get data from req:";
- } else {
- assert(batch_request_p);
- batch_request_p->set_seq(request->seq());
- batch_request_p->set_proxy_id(request->proxy_id());
- // LOG(ERROR)<<" get from cache:"<<uid;
+ BatchUserRequest batch_request;
+ if (!batch_request.ParseFromString(request->data())) {
+ LOG(ERROR) << "parse data fail";
+ }
+ batch_request.set_seq(request->seq());
+ batch_request.set_hash(request->hash());
+ batch_request.set_proxy_id(request->proxy_id());
+ if (request->has_committed_certs()) {
+ *batch_request.mutable_committed_certs() = request->committed_certs();
}
- assert(batch_request_p);
// LOG(INFO) << " get request batch size:"
- // << batch_request.user_requests_size()<<" proxy id:"
- // <<request->proxy_id()<<" need execute:"<<need_execute;
+ // << batch_request.user_requests_size()<<" proxy
+ // id:"<<request->proxy_id()<<" need execute:"<<need_execute;
+ // std::unique_ptr<BatchUserResponse> batch_response =
+ // std::make_unique<BatchUserResponse>();
std::unique_ptr<BatchUserResponse> response;
- // need_execute = false;
+ global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_ && need_execute) {
- if (execute_thread_num_ == 1) {
- response = transaction_manager_->ExecuteBatch(*batch_request_p);
- } else {
- std::vector<std::unique_ptr<std::string>> response_v;
-
- if(data_p == nullptr) {
- int64_t start_time = GetCurrentTime();
- data = std::move(transaction_manager_->Prepare(*batch_request_p));
- int64_t end_time = GetCurrentTime();
- if (end_time - start_time > 10) {
- // LOG(ERROR)<<"exec data done:"<<uid<<" wait
- // time:"<<(end_time-start_time);
- }
- data_p = data.get();
- }
+ response = transaction_manager_->ExecuteBatch(batch_request);
+ }
- WaitForExecute(request->seq());
- if(data_p->empty() || (*data_p)[0] == nullptr){
- response =
transaction_manager_->ExecuteBatch(*batch_request_p);
- }
- else {
- response_v =
transaction_manager_->ExecuteBatchData(*data_p);
- }
- FinishExecute(request->seq());
-
- if(response == nullptr){
- response = std::make_unique<BatchUserResponse>();
- for (auto& s : response_v) {
- response->add_response()->swap(*s);
- }
- }
- }
+ if (duplicate_manager_) {
+ duplicate_manager_->AddExecuted(batch_request.hash(), batch_request.seq());
}
- // LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
+ global_stats_->IncTotalRequest(batch_request.user_requests_size());
if (response == nullptr) {
response = std::make_unique<BatchUserResponse>();
}
- global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
- response->set_proxy_id(batch_request_p->proxy_id());
- response->set_createtime(batch_request_p->createtime() +
request->queuing_time());
- response->set_local_id(batch_request_p->local_id());
- global_stats_->AddCommitDelay(GetCurrentTime()- response->createtime());
- response->set_seq(request->seq());
+ response->set_proxy_id(batch_request.proxy_id());
+ response->set_createtime(batch_request.createtime());
+ response->set_local_id(batch_request.local_id());
+ response->set_hash(batch_request.hash());
- if (post_exec_func_) {
- post_exec_func_(std::move(request), std::move(response));
- }
+ post_exec_func_(std::move(request), std::move(response));
global_stats_->IncExecuteDone();
- if(need_gc){
- gc_queue_.Push(std::make_unique<int64_t>(uid));
- }
}
void TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
duplicate_manager_ = manager;
}
-
-bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
- std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
- auto it = flag_[uid % mod].find(uid);
- if (it == flag_[uid % mod].end()) {
- flag_[uid % mod][uid] |= f;
- // LOG(ERROR)<<"NO FUTURE uid:"<<uid;
- return true;
- }
- assert(it != flag_[uid % mod].end());
- if (f == Start_Prepare) {
- if (flag_[uid % mod][uid] & Start_Execute) {
- return false;
- }
- } else if(f == Start_Execute){
- if (flag_[uid % mod][uid] & End_Prepare) {
- //if (flag_[uid % mod][uid] & Start_Prepare) {
- return false;
- }
- }
- flag_[uid % mod][uid] |= f;
- return true;
-}
-
-void TransactionExecutor::ClearPromise(uint64_t uid) {
- std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
- auto it = pre_[uid % mod].find(uid);
- if (it == pre_[uid % mod].end()) {
- return;
- }
- // LOG(ERROR)<<"CLEAR UID:"<<uid;
- assert(it != pre_[uid % mod].end());
- assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
- //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
- //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
- //data_[uid%mod].erase(data_[uid%mod].find(uid));
- //req_[uid%mod].erase(req_[uid%mod].find(uid));
- pre_[uid % mod].erase(it);
- flag_[uid % mod].erase(flag_[uid % mod].find(uid));
-}
-
-std::promise<int>* TransactionExecutor::GetPromise(uint64_t uid) {
- std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
- auto it = pre_[uid % mod].find(uid);
- if (it == pre_[uid % mod].end()) {
- return nullptr;
- }
- return it->second.get();
-}
-
-std::unique_ptr<std::future<int>> TransactionExecutor::GetFuture(uint64_t uid)
{
- std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
- auto it = pre_[uid % mod].find(uid);
- if (it == pre_[uid % mod].end()) {
- return nullptr;
- }
- //return std::move(it->second);
- // LOG(ERROR)<<"add future:"<<uid;
- return std::make_unique<std::future<int>>(it->second->get_future());
-}
-
-bool TransactionExecutor::AddFuture(uint64_t uid) {
- std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
- auto it = pre_[uid % mod].find(uid);
- if (it == pre_[uid % mod].end()) {
- // LOG(ERROR)<<"add future:"<<uid;
- std::unique_ptr<std::promise<int>> p =
- std::make_unique<std::promise<int>>();
- //auto f = std::make_unique<std::future<int>>(p->get_future());
- pre_[uid % mod][uid] = std::move(p);
- //pre_f_[uid % mod][uid] = std::move(f);
- flag_[uid % mod][uid] = 0;
- return true;
- }
- return false;
-}
-
-void TransactionExecutor::Prepare(std::unique_ptr<Request> request) {
- if (AddFuture(request->uid())) {
- prepare_queue_.Push(std::move(request));
- }
-}
-
-void TransactionExecutor::GCProcess() {
- while (!IsStop()) {
- std::unique_ptr<int64_t> uid_or = gc_queue_.Pop();
- if (uid_or== nullptr) {
- continue;
- }
- int64_t uid = *uid_or;
-
- std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
- {
- std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
- assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
- data_p = data_[uid%mod][uid].get();
- }
-
- for(int i = 0; i < data_p->size(); ++i){
- (*data_p)[i].release();
- }
- (*data_p).clear();
- {
- std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
- assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
- assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
- data_[uid%mod].erase(data_[uid%mod].find(uid));
- req_[uid%mod].erase(req_[uid%mod].find(uid));
- }
- }
-}
-
-void TransactionExecutor::PrepareMessage() {
- while (!IsStop()) {
- std::unique_ptr<Request> request = prepare_queue_.Pop();
- if (request == nullptr) {
- continue;
- }
-
- uint64_t uid = request->uid();
- int current_f = SetFlag(uid, Start_Prepare);
- if (current_f == 0) {
- // commit has done
- // LOG(ERROR)<<" want prepare, commit started:"<<uid;
-// ClearPromise(uid);
- continue;
- }
-
- std::promise<int>* p = GetPromise(uid) ;
- assert(p);
- //LOG(ERROR)<<" prepare started:"<<uid;
-
- // LOG(ERROR)<<" prepare uid:"<<uid;
-
- // Execute the request, then send the response back to the user.
- std::unique_ptr<BatchUserRequest> batch_request =
- std::make_unique<BatchUserRequest>();
- if (!batch_request->ParseFromString(request->data())) {
- LOG(ERROR) << "parse data fail";
- }
- // batch_request = std::make_unique<BatchUserRequest>();
- batch_request->set_seq(request->seq());
- batch_request->set_hash(request->hash());
- batch_request->set_proxy_id(request->proxy_id());
- if (request->has_committed_certs()) {
- *batch_request->mutable_committed_certs() = request->committed_certs();
- }
-
- // LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
- // id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
-
- std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
- request_v = transaction_manager_->Prepare(*batch_request);
- {
- std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
- // assert(request_v);
- // assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
- data_[uid%mod][uid] = std::move(request_v);
- req_[uid % mod][uid] = std::move(batch_request);
- }
- //LOG(ERROR)<<"set promise:"<<uid;
- p->set_value(1);
- {
- int set_ret = SetFlag(uid, End_Prepare);
- if (set_ret == 0) {
- // LOG(ERROR)<<"commit interrupt:"<<uid;
- //ClearPromise(uid);
- } else {
- //LOG(ERROR)<<"prepare done:"<<uid;
- }
- }
- }
-}
-
-
} // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.h
b/platform/consensus/execution/transaction_executor.h
index cbd31c60..2b111164 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -62,16 +62,8 @@ class TransactionExecutor {
void SetDuplicateManager(DuplicateManager* manager);
- void AddExecuteMessage(std::unique_ptr<Request> message);
-
Storage* GetStorage();
- void RegisterExecute(int64_t seq);
- void WaitForExecute(int64_t seq);
- void FinishExecute(int64_t seq);
-
- void Prepare(std::unique_ptr<Request> request);
-
private:
void Execute(std::unique_ptr<Request> request, bool need_execute = true);
void OnlyExecute(std::unique_ptr<Request> request);
@@ -88,15 +80,6 @@ class TransactionExecutor {
void UpdateMaxExecutedSeq(uint64_t seq);
- bool SetFlag(uint64_t uid, int f);
- void ClearPromise(uint64_t uid);
- void PrepareMessage();
- void GCProcess();
-
- bool AddFuture(uint64_t uid);
- std::unique_ptr<std::future<int>> GetFuture(uint64_t uid);
- std::promise<int>* GetPromise(uint64_t uid);
-
protected:
ResDBConfig config_;
@@ -108,43 +91,11 @@ class TransactionExecutor {
SystemInfo* system_info_ = nullptr;
std::unique_ptr<TransactionManager> transaction_manager_ = nullptr;
std::map<uint64_t, std::unique_ptr<Request>> candidates_;
- std::thread ordering_thread_, execute_OOO_thread_;
- std::vector<std::thread> execute_thread_;
+ std::thread ordering_thread_, execute_thread_, execute_OOO_thread_;
LockFreeQueue<Request> commit_queue_, execute_queue_, execute_OOO_queue_;
std::atomic<bool> stop_;
Stats* global_stats_ = nullptr;
DuplicateManager* duplicate_manager_;
- int execute_thread_num_ = 10;
- static const int blucket_num_ = 1024;
- int blucket_[blucket_num_];
- std::condition_variable cv_;
- std::mutex mutex_;
-
- enum PrepareType {
- Start_Prepare = 1,
- Start_Execute = 2,
- End_Prepare = 4,
- };
-
-
- std::vector<std::thread> prepare_thread_;
- std::thread gc_thread_;
- static const int mod = 2048;
- std::mutex f_mutex_[mod], fd_mutex_[mod];
- LockFreeQueue<Request> prepare_queue_;
- LockFreeQueue<int64_t> gc_queue_;
- typedef std::unique_ptr<std::promise<int>> PromiseType;
- std::map<uint64_t, PromiseType> pre_[mod];
-
- std::map<uint64_t, std::unique_ptr<std::future<int>>> pre_f_[mod];
- std::map<uint64_t, int> flag_[mod];
-
- std::map<uint64_t, std::unique_ptr<BatchUserRequest>> req_[mod];
- std::unordered_map<
- uint64_t,
- std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>>
- data_[mod];
-
};
} // namespace resdb