This is an automated email from the ASF dual-hosted git repository.

junchao pushed a commit to branch smart_contract_merge
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/smart_contract_merge by this 
push:
     new e664bc35 merge master
e664bc35 is described below

commit e664bc35bdeb03467916047fd6fcb2dcd5dc14d6
Author: cjcchen <[email protected]>
AuthorDate: Fri Feb 14 02:58:26 2025 +0800

    merge master
---
 platform/config/resdb_config.cpp                   |   7 +-
 platform/config/resdb_config.h                     |  14 +-
 .../consensus/execution/transaction_executor.cpp   | 394 ++-------------------
 .../consensus/execution/transaction_executor.h     |  51 +--
 4 files changed, 39 insertions(+), 427 deletions(-)

diff --git a/platform/config/resdb_config.cpp b/platform/config/resdb_config.cpp
index 08c011ab..4c3ff954 100644
--- a/platform/config/resdb_config.cpp
+++ b/platform/config/resdb_config.cpp
@@ -62,7 +62,7 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
     config_data_.set_view_change_timeout_ms(viewchange_commit_timeout_ms_);
   }
   if (config_data_.client_batch_num() == 0) {
-    config_data_.set_client_batch_num(100);
+    config_data_.set_client_batch_num(client_batch_num_);
   }
   if (config_data_.worker_num() == 0) {
     config_data_.set_worker_num(worker_num_);
@@ -76,9 +76,6 @@ ResDBConfig::ResDBConfig(const ResConfigData& config_data,
   if (config_data_.tcp_batch_num() == 0) {
     config_data_.set_tcp_batch_num(100);
   }
-  if (config_data_.max_process_txn() == 0) {
-    config_data_.set_max_process_txn(64);
-  }
 }
 
 void ResDBConfig::SetConfigData(const ResConfigData& config_data) {
@@ -180,7 +177,7 @@ void ResDBConfig::SetSignatureVerifierEnabled(bool 
enable_sv) {
 }
 
 // Performance setting
-bool ResDBConfig::IsPerformanceRunning() const {
+bool ResDBConfig::IsPerformanceRunning() {
   return is_performance_running_ || GetConfigData().is_performance_running();
 }
 
diff --git a/platform/config/resdb_config.h b/platform/config/resdb_config.h
index 0284ae9a..fb56a9dd 100644
--- a/platform/config/resdb_config.h
+++ b/platform/config/resdb_config.h
@@ -94,7 +94,7 @@ class ResDBConfig {
   void SetSignatureVerifierEnabled(bool enable_sv);
 
   // Performance setting
-  bool IsPerformanceRunning() const;
+  bool IsPerformanceRunning();
   void RunningPerformance(bool);
 
   bool IsTestMode() const;
@@ -135,17 +135,15 @@ class ResDBConfig {
   bool signature_verifier_enabled_ = true;
   bool is_performance_running_ = false;
   bool is_test_mode_ = false;
+  uint32_t max_process_txn_ = 2048;
   uint32_t client_batch_wait_time_ms_ = 100;  // milliseconds, 0.1s
+  uint32_t client_batch_num_ = 100;
   uint64_t viewchange_commit_timeout_ms_ =
       60000;  // default 60s to change viewchange
 
-
-  // This is the default settings.
-  // change these parameters in the configuration.
-  uint32_t max_process_txn_ = 64;
-  uint32_t worker_num_ = 16;
-  uint32_t input_worker_num_ = 5;
-  uint32_t output_worker_num_ = 5;
+  uint32_t worker_num_ = 64;
+  uint32_t input_worker_num_ = 1;
+  uint32_t output_worker_num_ = 1;
 };
 
 }  // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.cpp 
b/platform/consensus/execution/transaction_executor.cpp
index 1f98babd..fd24da3a 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -20,7 +20,6 @@
 #include "platform/consensus/execution/transaction_executor.h"
 
 #include <glog/logging.h>
-#include "common/utils/utils.h"
 
 namespace resdb {
 
@@ -36,86 +35,30 @@ TransactionExecutor::TransactionExecutor(
       execute_queue_("execute"),
       stop_(false),
       duplicate_manager_(nullptr) {
-
-  memset(blucket_, 0, sizeof(blucket_));
   global_stats_ = Stats::GetGlobalStats();
   ordering_thread_ = std::thread(&TransactionExecutor::OrderMessage, this);
-  for (int i = 0; i < execute_thread_num_; ++i) {
-    execute_thread_.push_back(
-        std::thread(&TransactionExecutor::ExecuteMessage, this));
-  }
-
-  for (int i = 0; i < 1; ++i) {
-    prepare_thread_.push_back(
-        std::thread(&TransactionExecutor::PrepareMessage, this));
-  }
+  execute_thread_ = std::thread(&TransactionExecutor::ExecuteMessage, this);
 
   if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
     execute_OOO_thread_ =
         std::thread(&TransactionExecutor::ExecuteMessageOutOfOrder, this);
     LOG(ERROR) << " is out of order:" << transaction_manager_->IsOutOfOrder();
   }
-  gc_thread_ = std::thread(&TransactionExecutor::GCProcess, this);
 }
 
 TransactionExecutor::~TransactionExecutor() { Stop(); }
 
-void TransactionExecutor::RegisterExecute(int64_t seq) {
-  if (execute_thread_num_ == 1) return;
-  int idx = seq % blucket_num_;
-  std::unique_lock<std::mutex> lk(mutex_);
-  // LOG(ERROR)<<"register seq:"<<seq<<" bluck:"<<blucket_[idx];
-  assert(!blucket_[idx] || !(blucket_[idx] ^ 3));
-  blucket_[idx] = 1;
-  // LOG(ERROR)<<"register seq:"<<seq;
-}
-
-void TransactionExecutor::WaitForExecute(int64_t seq) {
-  if (execute_thread_num_ == 1) return;
-  int pre_idx = (seq - 1 + blucket_num_) % blucket_num_;
-
-  while (!IsStop()) {
-    std::unique_lock<std::mutex> lk(mutex_);
-    cv_.wait_for(lk, std::chrono::milliseconds(10000), [&] {
-      return ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]);
-    });
-    if ((blucket_[pre_idx] & 2) || !blucket_[pre_idx]) {
-      break;
-    }
-  }
-  // LOG(ERROR)<<"wait for :"<<seq<<" done";
-}
-
-void TransactionExecutor::FinishExecute(int64_t seq) {
-  if (execute_thread_num_ == 1) return;
-  int idx = seq % blucket_num_;
-  std::unique_lock<std::mutex> lk(mutex_);
-  // LOG(ERROR)<<"finish :"<<seq<<" done";
-  blucket_[idx] = 3;
-  cv_.notify_all();
-}
-
 void TransactionExecutor::Stop() {
   stop_ = true;
   if (ordering_thread_.joinable()) {
     ordering_thread_.join();
   }
-  for (auto& th : execute_thread_) {
-    if (th.joinable()) {
-      th.join();
-    }
-  }
-  for (auto& th : prepare_thread_) {
-    if (th.joinable()) {
-      th.join();
-    }
+  if (execute_thread_.joinable()) {
+    execute_thread_.join();
   }
   if (execute_OOO_thread_.joinable()) {
     execute_OOO_thread_.join();
   }
-  if (gc_thread_.joinable()) {
-    gc_thread_.join();
-  }
 }
 
 Storage* TransactionExecutor::GetStorage() {
@@ -201,12 +144,6 @@ void TransactionExecutor::OrderMessage() {
   return;
 }
 
-void TransactionExecutor::AddExecuteMessage(std::unique_ptr<Request> message) {
-    global_stats_->IncCommit();
-    message->set_commit_time(GetCurrentTime());
-    execute_queue_.Push(std::move(message));
-}
-
 void TransactionExecutor::ExecuteMessage() {
   while (!IsStop()) {
     auto message = execute_queue_.Pop();
@@ -217,11 +154,7 @@ void TransactionExecutor::ExecuteMessage() {
     if (transaction_manager_ && transaction_manager_->IsOutOfOrder()) {
       need_execute = false;
     }
-    int64_t start_time = GetCurrentTime();
-    global_stats_->AddExecuteQueuingLatency(start_time - 
message->commit_time());
     Execute(std::move(message), need_execute);
-    int64_t end_time = GetCurrentTime();
-    global_stats_->AddExecuteLatency(end_time-start_time);
   }
 }
 
@@ -251,7 +184,10 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
   // LOG(INFO) << " get request batch size:"
   //          << batch_request.user_requests_size()<<" proxy
   //          id:"<<request->proxy_id();
+  // std::unique_ptr<BatchUserResponse> batch_response =
+  //     std::make_unique<BatchUserResponse>();
   std::unique_ptr<BatchUserResponse> response;
+  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_) {
     response = transaction_manager_->ExecuteBatch(batch_request);
   }
@@ -262,321 +198,51 @@ void 
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
 
 void TransactionExecutor::Execute(std::unique_ptr<Request> request,
                                   bool need_execute) {
-  uint64_t uid = request->uid();
-  int64_t seq = request->seq();
-  RegisterExecute(request->seq());
-  std::unique_ptr<BatchUserRequest> batch_request = nullptr;
-  std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>> 
data;
-  std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
-  BatchUserRequest* batch_request_p = nullptr;
-
-  bool need_gc = false;
-
-  if (request->uid() > 0) {
-    bool current_f = SetFlag(uid, Start_Execute);
-    if (!current_f) {
-      global_stats_->ConsumeTransactions(1);
-      std::unique_ptr<std::future<int>> data_f = GetFuture(uid);
-      //LOG(ERROR)<<"wait prepare:"<<uid;
-      {
-        data_f->get();
-      }
-      //LOG(ERROR)<<"wait prepare done:"<<uid;
-
-      // prepare is done
-      //LOG(ERROR)<<"prepare is done:"<<uid;
-      {
-        int64_t start_time = GetCurrentTime();
-        std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
-        if(req_[uid % mod][uid] == nullptr){
-          LOG(ERROR)<<"data is empty:"<<uid;
-        }
-        assert(req_[uid % mod][uid] != nullptr);
-        //batch_request = std::move(req_[uid % mod][uid]);
-        batch_request_p = req_[uid % mod][uid].get();
-        auto it = data_[uid % mod].find(uid);
-        if (it != data_[uid % mod].end()) {
-          assert(it->second!=nullptr);
-          data_p = it->second.get();
-          //data = std::move(it->second);
-        }
-        int64_t end_time = GetCurrentTime();
-        if (end_time - start_time > 1000) {
-          LOG(ERROR) << "get data done:" << uid
-                     << " wait time:" << (end_time - start_time);
-        }
-      }
-      ClearPromise(uid);
-      need_gc = true;
-    } else {
-      global_stats_->AddNewTransactions(1);
-      //LOG(ERROR)<<"commit start:"<<uid;
-      // LOG(ERROR)<<" prepare not start:"<<uid;
-    }
-  }
-
   // Execute the request, then send the response back to the user.
-  if (batch_request_p == nullptr) {
-    batch_request = std::make_unique<BatchUserRequest>();
-    if (!batch_request->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse data fail";
-    }
-    batch_request->set_hash(request->hash());
-    if (request->has_committed_certs()) {
-      *batch_request->mutable_committed_certs() = request->committed_certs();
-    }
-    batch_request->set_seq(request->seq());
-    batch_request->set_proxy_id(request->proxy_id());
-    batch_request_p = batch_request.get();
-    // LOG(ERROR)<<"get data from req:";
-  } else {
-  assert(batch_request_p);
-    batch_request_p->set_seq(request->seq());
-    batch_request_p->set_proxy_id(request->proxy_id());
-    // LOG(ERROR)<<" get from cache:"<<uid;
+  BatchUserRequest batch_request;
+  if (!batch_request.ParseFromString(request->data())) {
+    LOG(ERROR) << "parse data fail";
+  }
+  batch_request.set_seq(request->seq());
+  batch_request.set_hash(request->hash());
+  batch_request.set_proxy_id(request->proxy_id());
+  if (request->has_committed_certs()) {
+    *batch_request.mutable_committed_certs() = request->committed_certs();
   }
-  assert(batch_request_p);
 
   // LOG(INFO) << " get request batch size:"
-  // << batch_request.user_requests_size()<<" proxy id:"
-  //  <<request->proxy_id()<<" need execute:"<<need_execute;
+  //         << batch_request.user_requests_size()<<" proxy
+  //         id:"<<request->proxy_id()<<" need execute:"<<need_execute;
+  // std::unique_ptr<BatchUserResponse> batch_response =
+  //     std::make_unique<BatchUserResponse>();
 
   std::unique_ptr<BatchUserResponse> response;
-  // need_execute = false;
+  global_stats_->GetTransactionDetails(batch_request);
   if (transaction_manager_ && need_execute) {
-    if (execute_thread_num_ == 1) {
-      response = transaction_manager_->ExecuteBatch(*batch_request_p);
-    } else {
-      std::vector<std::unique_ptr<std::string>> response_v;
-
-      if(data_p == nullptr) {
-        int64_t start_time = GetCurrentTime();
-        data = std::move(transaction_manager_->Prepare(*batch_request_p));
-        int64_t end_time = GetCurrentTime();
-        if (end_time - start_time > 10) {
-          // LOG(ERROR)<<"exec data done:"<<uid<<" wait
-          // time:"<<(end_time-start_time);
-        }
-        data_p = data.get();
-      }
+    response = transaction_manager_->ExecuteBatch(batch_request);
+  }
 
-      WaitForExecute(request->seq());
-           if(data_p->empty() || (*data_p)[0] == nullptr){
-                   response = 
transaction_manager_->ExecuteBatch(*batch_request_p);
-           }
-           else {
-                   response_v = 
transaction_manager_->ExecuteBatchData(*data_p);
-           }
-      FinishExecute(request->seq());
-
-      if(response == nullptr){
-             response = std::make_unique<BatchUserResponse>();
-             for (auto& s : response_v) {
-                     response->add_response()->swap(*s);
-             }
-      }
-    }
+  if (duplicate_manager_) {
+    duplicate_manager_->AddExecuted(batch_request.hash(), batch_request.seq());
   }
-  // LOG(ERROR)<<" CF = :"<<(cf==1)<<" uid:"<<uid;
 
+  global_stats_->IncTotalRequest(batch_request.user_requests_size());
   if (response == nullptr) {
     response = std::make_unique<BatchUserResponse>();
   }
-  global_stats_->IncTotalRequest(batch_request_p->user_requests_size());
-  response->set_proxy_id(batch_request_p->proxy_id());
-  response->set_createtime(batch_request_p->createtime() + 
request->queuing_time());
-  response->set_local_id(batch_request_p->local_id());
-  global_stats_->AddCommitDelay(GetCurrentTime()- response->createtime());
 
-  response->set_seq(request->seq());
+  response->set_proxy_id(batch_request.proxy_id());
+  response->set_createtime(batch_request.createtime());
+  response->set_local_id(batch_request.local_id());
+  response->set_hash(batch_request.hash());
 
-  if (post_exec_func_) {
-    post_exec_func_(std::move(request), std::move(response));
-  }
+  post_exec_func_(std::move(request), std::move(response));
 
   global_stats_->IncExecuteDone();
-  if(need_gc){
-    gc_queue_.Push(std::make_unique<int64_t>(uid));
-  }
 }
 
 void TransactionExecutor::SetDuplicateManager(DuplicateManager* manager) {
   duplicate_manager_ = manager;
 }
 
-
-bool TransactionExecutor::SetFlag(uint64_t uid, int f) {
-  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
-  auto it = flag_[uid % mod].find(uid);
-  if (it == flag_[uid % mod].end()) {
-    flag_[uid % mod][uid] |= f;
-    // LOG(ERROR)<<"NO FUTURE uid:"<<uid;
-    return true;
-  }
-  assert(it != flag_[uid % mod].end());
-  if (f == Start_Prepare) {
-    if (flag_[uid % mod][uid] & Start_Execute) {
-      return false;
-    }
-  } else if(f == Start_Execute){
-    if (flag_[uid % mod][uid] & End_Prepare) {
-    //if (flag_[uid % mod][uid] & Start_Prepare) {
-      return false;
-    }
-  }
-  flag_[uid % mod][uid] |= f;
-  return true;
-}
-
-void TransactionExecutor::ClearPromise(uint64_t uid) {
-  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
-  auto it = pre_[uid % mod].find(uid);
-  if (it == pre_[uid % mod].end()) {
-    return;
-  }
-  // LOG(ERROR)<<"CLEAR UID:"<<uid;
-  assert(it != pre_[uid % mod].end());
-  assert(flag_[uid % mod].find(uid) != flag_[uid % mod].end());
-  //assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
-  //assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
-  //data_[uid%mod].erase(data_[uid%mod].find(uid));
-  //req_[uid%mod].erase(req_[uid%mod].find(uid));
-  pre_[uid % mod].erase(it);
-  flag_[uid % mod].erase(flag_[uid % mod].find(uid));
-}
-
-std::promise<int>* TransactionExecutor::GetPromise(uint64_t uid) {
-  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
-  auto it = pre_[uid % mod].find(uid);
-  if (it == pre_[uid % mod].end()) {
-    return nullptr;
-  }
-  return it->second.get();
-}
-
-std::unique_ptr<std::future<int>> TransactionExecutor::GetFuture(uint64_t uid) 
{
-  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
-  auto it = pre_[uid % mod].find(uid);
-  if (it == pre_[uid % mod].end()) {
-    return nullptr;
-  }
-  //return std::move(it->second);
-  // LOG(ERROR)<<"add future:"<<uid;
-  return std::make_unique<std::future<int>>(it->second->get_future());
-}
-
-bool TransactionExecutor::AddFuture(uint64_t uid) {
-  std::unique_lock<std::mutex> lk(f_mutex_[uid % mod]);
-  auto it = pre_[uid % mod].find(uid);
-  if (it == pre_[uid % mod].end()) {
-    // LOG(ERROR)<<"add future:"<<uid;
-    std::unique_ptr<std::promise<int>> p =
-        std::make_unique<std::promise<int>>();
-    //auto f = std::make_unique<std::future<int>>(p->get_future());
-    pre_[uid % mod][uid] = std::move(p);
-    //pre_f_[uid % mod][uid] = std::move(f);
-    flag_[uid % mod][uid] = 0;
-    return true;
-  }
-  return false;
-}
-
-void TransactionExecutor::Prepare(std::unique_ptr<Request> request) {
-  if (AddFuture(request->uid())) {
-    prepare_queue_.Push(std::move(request));
-  }
-}
-
-void TransactionExecutor::GCProcess() {
-  while (!IsStop()) {
-    std::unique_ptr<int64_t> uid_or = gc_queue_.Pop();
-    if (uid_or== nullptr) {
-      continue;
-    }
-    int64_t uid = *uid_or;
-  
-    std::vector<std::unique_ptr<google::protobuf::Message>> * data_p = nullptr;
-    {
-      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
-      assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
-      data_p = data_[uid%mod][uid].get();
-    }
-
-    for(int i = 0; i < data_p->size(); ++i){
-      (*data_p)[i].release();
-    }
-    (*data_p).clear();
-    {
-      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
-      assert(req_[uid%mod].find(uid) != req_[uid%mod].end());
-      assert(data_[uid%mod].find(uid) != data_[uid%mod].end());
-      data_[uid%mod].erase(data_[uid%mod].find(uid));
-      req_[uid%mod].erase(req_[uid%mod].find(uid));
-    }
-  }
-}
-
-void TransactionExecutor::PrepareMessage() {
-  while (!IsStop()) {
-    std::unique_ptr<Request> request = prepare_queue_.Pop();
-    if (request == nullptr) {
-      continue;
-    }
-
-    uint64_t uid = request->uid();
-    int current_f = SetFlag(uid, Start_Prepare);
-    if (current_f == 0) {
-      // commit has done
-      // LOG(ERROR)<<" want prepare, commit started:"<<uid;
-//      ClearPromise(uid);
-      continue;
-    }
-
-    std::promise<int>* p = GetPromise(uid) ;
-    assert(p);
-    //LOG(ERROR)<<" prepare started:"<<uid;
-
-    // LOG(ERROR)<<" prepare uid:"<<uid;
-
-    // Execute the request, then send the response back to the user.
-    std::unique_ptr<BatchUserRequest> batch_request =
-        std::make_unique<BatchUserRequest>();
-    if (!batch_request->ParseFromString(request->data())) {
-      LOG(ERROR) << "parse data fail";
-    }
-    // batch_request = std::make_unique<BatchUserRequest>();
-    batch_request->set_seq(request->seq());
-    batch_request->set_hash(request->hash());
-    batch_request->set_proxy_id(request->proxy_id());
-    if (request->has_committed_certs()) {
-      *batch_request->mutable_committed_certs() = request->committed_certs();
-    }
-
-    // LOG(ERROR)<<"prepare seq:"<<batch_request->seq()<<" proxy
-    // id:"<<request->proxy_id()<<" local id:"<<batch_request->local_id();
-
-    std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
-     request_v = transaction_manager_->Prepare(*batch_request);
-    {
-      std::unique_lock<std::mutex> lk(fd_mutex_[uid % mod]);
-   //   assert(request_v);
-      // assert(data_[uid%mod].find(uid) == data_[uid%mod].end());
-      data_[uid%mod][uid] = std::move(request_v);
-      req_[uid % mod][uid] = std::move(batch_request);
-    }
-    //LOG(ERROR)<<"set promise:"<<uid;
-    p->set_value(1);
-    {
-      int set_ret = SetFlag(uid, End_Prepare);
-      if (set_ret == 0) {
-        // LOG(ERROR)<<"commit interrupt:"<<uid;
-        //ClearPromise(uid);
-      } else {
-        //LOG(ERROR)<<"prepare done:"<<uid;
-      }
-    }
-  }
-}
-
-
 }  // namespace resdb
diff --git a/platform/consensus/execution/transaction_executor.h 
b/platform/consensus/execution/transaction_executor.h
index cbd31c60..2b111164 100644
--- a/platform/consensus/execution/transaction_executor.h
+++ b/platform/consensus/execution/transaction_executor.h
@@ -62,16 +62,8 @@ class TransactionExecutor {
 
   void SetDuplicateManager(DuplicateManager* manager);
 
-  void AddExecuteMessage(std::unique_ptr<Request> message);
-
   Storage* GetStorage();
 
-  void RegisterExecute(int64_t seq);
-  void WaitForExecute(int64_t seq);
-  void FinishExecute(int64_t seq);
-
-  void Prepare(std::unique_ptr<Request> request);
-
  private:
   void Execute(std::unique_ptr<Request> request, bool need_execute = true);
   void OnlyExecute(std::unique_ptr<Request> request);
@@ -88,15 +80,6 @@ class TransactionExecutor {
 
   void UpdateMaxExecutedSeq(uint64_t seq);
 
-  bool SetFlag(uint64_t uid, int f);
-  void ClearPromise(uint64_t uid);
-  void PrepareMessage();
-  void GCProcess();
-
-  bool AddFuture(uint64_t uid);
-  std::unique_ptr<std::future<int>> GetFuture(uint64_t uid);
-  std::promise<int>* GetPromise(uint64_t uid);
-
  protected:
   ResDBConfig config_;
 
@@ -108,43 +91,11 @@ class TransactionExecutor {
   SystemInfo* system_info_ = nullptr;
   std::unique_ptr<TransactionManager> transaction_manager_ = nullptr;
   std::map<uint64_t, std::unique_ptr<Request>> candidates_;
-  std::thread ordering_thread_, execute_OOO_thread_;
-  std::vector<std::thread> execute_thread_;
+  std::thread ordering_thread_, execute_thread_, execute_OOO_thread_;
   LockFreeQueue<Request> commit_queue_, execute_queue_, execute_OOO_queue_;
   std::atomic<bool> stop_;
   Stats* global_stats_ = nullptr;
   DuplicateManager* duplicate_manager_;
-  int execute_thread_num_ = 10;
-  static const int blucket_num_ = 1024;
-  int blucket_[blucket_num_];
-  std::condition_variable cv_;
-  std::mutex mutex_;
-
-  enum PrepareType {
-    Start_Prepare = 1,
-    Start_Execute = 2,
-    End_Prepare = 4,
-  };
-
-
-  std::vector<std::thread> prepare_thread_;
-  std::thread gc_thread_;
-  static const int mod = 2048;
-  std::mutex f_mutex_[mod], fd_mutex_[mod];
-  LockFreeQueue<Request> prepare_queue_;
-  LockFreeQueue<int64_t> gc_queue_;
-  typedef std::unique_ptr<std::promise<int>> PromiseType;
-  std::map<uint64_t, PromiseType> pre_[mod];
-
-  std::map<uint64_t, std::unique_ptr<std::future<int>>> pre_f_[mod];
-  std::map<uint64_t, int> flag_[mod];
-
-  std::map<uint64_t, std::unique_ptr<BatchUserRequest>> req_[mod];
-  std::unordered_map<
-      uint64_t,
-      std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>>
-      data_[mod];
-
 };
 
 }  // namespace resdb

Reply via email to