This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new a519bd6b Added 2PL logic, need to add tests still
a519bd6b is described below

commit a519bd6b14e3ce431b7ce958b445cab1c4793ae9
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Sat May 4 16:36:48 2024 -0700

    Added 2PL logic, need to add tests still
---
 executor/kv/quecc_executor.cpp  |   2 +-
 executor/kv/strict_executor.cpp | 136 +++++++++++++++++++++++++++++++++++++---
 executor/kv/strict_executor.h   |  11 +++-
 3 files changed, 136 insertions(+), 13 deletions(-)

diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index ce11ce7b..13c2d23a 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -207,8 +207,8 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   int batch_number = 0;
   int txn_id=0;
   // process through transactions
+  KVRequest kv_request;
   for (const auto& sub_request : request.user_requests()) {
-    KVRequest kv_request;
     if (!kv_request.ParseFromString(sub_request.request().data())) {
       LOG(ERROR) << "parse data fail";
       return std::move(this->batch_response_);
diff --git a/executor/kv/strict_executor.cpp b/executor/kv/strict_executor.cpp
index b5c2e025..b4d73ab2 100644
--- a/executor/kv/strict_executor.cpp
+++ b/executor/kv/strict_executor.cpp
@@ -40,24 +40,140 @@ StrictExecutor::StrictExecutor(std::unique_ptr<Storage> 
storage)
 
     }
 
-void StrictExecutor::WorkerThread(){
-  
+StrictExecutor::~StrictExecutor() {
+  is_stop_ = true;
+  empty_queue_.notify_all();
+  not_empty_queue_.notify_all();
+  lock_taken_.notify_all();
+  for (auto& th : thread_list_) {
+    if (th.joinable()) {
+      th.join();
+    }
+  }
 }
+void StrictExecutor::WorkerThread(){
+  KVRequest kv_request;
+  std::set<std::string> held_locks;
+  bool lock_conflict;
+  std::string key;
+  while(true){
+    //Wait until request for thread to process is added and grab it
+    queue_lock_.lock();
+    while(request_queue.size()==0){
+      if (not_empty_queue_.wait_for(queue_lock_, 
std::chrono::microseconds(100),
+        [this] { return is_stop_; })) {
+        queue_lock_.unlock();
+        return;
+      }
+    }
+    kv_request=request_queue_.front();
+    request_queue.pop();
+    queue_lock_.unlock();
 
-std::unique_ptr<BatchUserResponse> StrictExecutor::ExecuteBatch(
-    const BatchUserRequest& request) {
-  std::unique_ptr<BatchUserResponse> batch_response =
-      std::make_unique<BatchUserResponse>();
-  for (auto& sub_request : request.user_requests()) {
+    //Grab locks needed for request
+    map_lock_.lock();
+    //If multiple operations in kv_request
+    if(kv_request.ops_size()){
+      lock_conflict=true;
+      while(lock_conflict){
+        lock_conflict=false;
+        for(const auto& op : kv_request.ops()){
+          if(held_locks.contains(op.key())){
+            continue;
+          }
+          if(lock_map_.find(op.key()) == lock_map_.end() || 
lock_map_[op.key()]==0){
+            lock_map_[op.key()]=1;
+            held_locks.insert(op.key());
+          }
+          else{
+            lock_conflict=true;
+            break;
+          }
+        }
+        if(lock_conflict){
+          for(auto& key : held_locks_){
+            lock_map_[key]=0;
+          }
+          lock_taken_.notify_all();
+          //Put thread to sleep until another thread releases locks or time 
expires
+          if (lock_taken_.wait_for(map_lock_, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+            map_lock_.unlock();
+            return;
+          }
+        }
+      }
+    }
+    //If only one operation in kv_request
+    else{
+      if(lock_map_.find(kv_request.key()) == lock_map_.end()){
+        lock_map_[kv_request.key()]=1;
+        held_locks.insert(kv_request.key());
+      }
+      else{
+        while(lock_map[kv_request.key()]==1){
+          if (lock_taken_.wait_for(map_lock_, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+            map_lock_.unlock();
+            return;
+          }
+        }
+      }
+    }
+    map_lock_.unlock();
+
+    //Once all locks are grabbed, execute transaction
     std::unique_ptr<std::string> response =
-        ExecuteData(sub_request.request().data());
+        ExecuteData(kv_request);
     if (response == nullptr) {
       response = std::make_unique<std::string>();
     }
-    batch_response->add_response()->swap(*response);
+
+    //Release locks that were grabbed
+    map_lock_.lock();
+    for(auto& key : held_locks_){
+      lock_map_[key]=0;
+    }
+    map_lock_.unlock();
+    lock_taken_.notify_all();
+
+    //Add response
+    response_lock_.lock();
+    this->batch_response->add_response()->swap(*response);
+    //Need to check all threads are done processing, and if all threads are 
done + queue is empty, wake main thread
+    //Maybe check number of responses in batch response, if equals number of 
requests, all requests are processed    
+    if(this->batch_response_.get()->response_size()==request_count_){
+      empty_queue_.notify_all();
+    }
+    response_lock_.unlock();
   }
+}
 
-  return batch_response;
+std::unique_ptr<BatchUserResponse> StrictExecutor::ExecuteBatch(
+    const BatchUserRequest& request) {
+  this->batch_response_ = std::make_unique<BatchUserResponse>();
+  request_count_=0;
+  for (const auto& sub_request : request.user_requests()) {
+    KVRequest kv_request;
+    if (!kv_request.ParseFromString(sub_request.request().data())) {
+      LOG(ERROR) << "parse data fail";
+      return std::move(this->batch_response_);
+    }
+    queue_lock_.lock();
+    request_queue_.push(std::move(kv_request));
+    request_count_++;
+    queue_lock_.unlock();
+    //Might want to move this/add new condition to avoid unecessary broadcasts
+    not_empty_queue_.notify_all():
+  }
+  response_lock_.lock();
+  while(this->batch_response_.get()->response_size()!=request_count_){
+    if (empty_queue_.wait_for(queue_lock_, std::chrono::microseconds(100),
+        [this] { return is_stop_; })) {
+        response_lock_.unlock();
+      return std::move(this->batch_response_);
+    }
+  }
+  response_lock_.unlock();
+  return std::move(this->batch_response_);
 }
 
 std::unique_ptr<std::string> StrictExecutor::ExecuteData(
diff --git a/executor/kv/strict_executor.h b/executor/kv/strict_executor.h
index 5722a1c5..1cefe09b 100644
--- a/executor/kv/strict_executor.h
+++ b/executor/kv/strict_executor.h
@@ -58,11 +58,18 @@ class StrictExecutor : public TransactionManager {
  private:
   std::unique_ptr<Storage> storage_;
   std::unordered_map<std::string, int> lock_map_;
-  std::mutex lockMutex_;
-  std::priority_queue request_queue_;
+  std::mutex queue_lock_;
+  std::mutex map_lock_;
+  std::mutex response_lock_;
+  std::queue request_queue_;
+  std::condition_variable empty_queue_;
+  std::condition_variable not_empty_queue_;
+  std::condition_variable lock_taken_;
   bool is_stop_ = false;
   int thread_count_;
+  int request_count_;
   vector<thread> thread_list_;
+  std::unique_ptr<BatchUserResponse> batch_response_;
 };
 
 }  // namespace resdb

Reply via email to