This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/QueccBranch by this push:
new a519bd6b Added 2PL logic, need to add tests still
a519bd6b is described below
commit a519bd6b14e3ce431b7ce958b445cab1c4793ae9
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Sat May 4 16:36:48 2024 -0700
Added 2PL logic, need to add tests still
---
executor/kv/quecc_executor.cpp | 2 +-
executor/kv/strict_executor.cpp | 136 +++++++++++++++++++++++++++++++++++++---
executor/kv/strict_executor.h | 11 +++-
3 files changed, 136 insertions(+), 13 deletions(-)
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index ce11ce7b..13c2d23a 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -207,8 +207,8 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
int batch_number = 0;
int txn_id=0;
// process through transactions
+ KVRequest kv_request;
for (const auto& sub_request : request.user_requests()) {
- KVRequest kv_request;
if (!kv_request.ParseFromString(sub_request.request().data())) {
LOG(ERROR) << "parse data fail";
return std::move(this->batch_response_);
diff --git a/executor/kv/strict_executor.cpp b/executor/kv/strict_executor.cpp
index b5c2e025..b4d73ab2 100644
--- a/executor/kv/strict_executor.cpp
+++ b/executor/kv/strict_executor.cpp
@@ -40,24 +40,140 @@ StrictExecutor::StrictExecutor(std::unique_ptr<Storage>
storage)
}
-void StrictExecutor::WorkerThread(){
-
+StrictExecutor::~StrictExecutor() {
+ is_stop_ = true;
+ empty_queue_.notify_all();
+ not_empty_queue_.notify_all();
+ lock_taken_.notify_all();
+ for (auto& th : thread_list_) {
+ if (th.joinable()) {
+ th.join();
+ }
+ }
}
+void StrictExecutor::WorkerThread(){
+ KVRequest kv_request;
+ std::set<std::string> held_locks;
+ bool lock_conflict;
+ std::string key;
+ while(true){
+ //Wait until request for thread to process is added and grab it
+ queue_lock_.lock();
+ while(request_queue.size()==0){
+ if (not_empty_queue_.wait_for(queue_lock_,
std::chrono::microseconds(100),
+ [this] { return is_stop_; })) {
+ queue_lock_.unlock();
+ return;
+ }
+ }
+ kv_request=request_queue_.front();
+ request_queue.pop();
+ queue_lock_.unlock();
-std::unique_ptr<BatchUserResponse> StrictExecutor::ExecuteBatch(
- const BatchUserRequest& request) {
- std::unique_ptr<BatchUserResponse> batch_response =
- std::make_unique<BatchUserResponse>();
- for (auto& sub_request : request.user_requests()) {
+ //Grab locks needed for request
+ map_lock_.lock();
+ //If multiple operations in kv_request
+ if(kv_request.ops_size()){
+ lock_conflict=true;
+ while(lock_conflict){
+ lock_conflict=false;
+ for(const auto& op : kv_request.ops()){
+ if(held_locks.contains(op.key())){
+ continue;
+ }
+ if(lock_map_.find(op.key()) == lock_map_.end() ||
lock_map_[op.key()]==0){
+ lock_map_[op.key()]=1;
+ held_locks.insert(op.key());
+ }
+ else{
+ lock_conflict=true;
+ break;
+ }
+ }
+ if(lock_conflict){
+ for(auto& key : held_locks_){
+ lock_map_[key]=0;
+ }
+ lock_taken_.notify_all();
+ //Put thread to sleep until another thread releases locks or time
expires
+ if (lock_taken_.wait_for(map_lock_,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+ map_lock_.unlock();
+ return;
+ }
+ }
+ }
+ }
+ //If only one operation in kv_request
+ else{
+ if(lock_map_.find(kv_request.key()) == lock_map_.end()){
+ lock_map_[kv_request.key()]=1;
+ held_locks.insert(kv_request.key());
+ }
+ else{
+ while(lock_map[kv_request.key()]==1){
+ if (lock_taken_.wait_for(map_lock_,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+ map_lock_.unlock();
+ return;
+ }
+ }
+ }
+ }
+ map_lock_.unlock();
+
+ //Once all locks are grabbed, execute transaction
std::unique_ptr<std::string> response =
- ExecuteData(sub_request.request().data());
+ ExecuteData(kv_request);
if (response == nullptr) {
response = std::make_unique<std::string>();
}
- batch_response->add_response()->swap(*response);
+
+ //Release locks that were grabbed
+ map_lock_.lock();
+ for(auto& key : held_locks_){
+ lock_map_[key]=0;
+ }
+ map_lock_.unlock();
+ lock_taken_.notify_all();
+
+ //Add response
+ response_lock_.lock();
+ this->batch_response->add_response()->swap(*response);
+ //Need to check all threads are done processing, and if all threads are
done + queue is empty, wake main thread
+ //Maybe check number of responses in batch response, if equals number of
requests, all requests are processed
+ if(this->batch_response_.get()->response_size()==request_count_){
+ empty_queue_.notify_all();
+ }
+ response_lock_.unlock();
}
+}
- return batch_response;
+std::unique_ptr<BatchUserResponse> StrictExecutor::ExecuteBatch(
+ const BatchUserRequest& request) {
+ this->batch_response_ = std::make_unique<BatchUserResponse>();
+ request_count_=0;
+ for (const auto& sub_request : request.user_requests()) {
+ KVRequest kv_request;
+ if (!kv_request.ParseFromString(sub_request.request().data())) {
+ LOG(ERROR) << "parse data fail";
+ return std::move(this->batch_response_);
+ }
+ queue_lock_.lock();
+ request_queue_.push(std::move(kv_request));
+ request_count_++;
+ queue_lock_.unlock();
+ //Might want to move this/add new condition to avoid unecessary broadcasts
+ not_empty_queue_.notify_all():
+ }
+ response_lock_.lock();
+ while(this->batch_response_.get()->response_size()!=request_count_){
+ if (empty_queue_.wait_for(queue_lock_, std::chrono::microseconds(100),
+ [this] { return is_stop_; })) {
+ response_lock_.unlock();
+ return std::move(this->batch_response_);
+ }
+ }
+ response_lock_.unlock();
+ return std::move(this->batch_response_);
}
std::unique_ptr<std::string> StrictExecutor::ExecuteData(
diff --git a/executor/kv/strict_executor.h b/executor/kv/strict_executor.h
index 5722a1c5..1cefe09b 100644
--- a/executor/kv/strict_executor.h
+++ b/executor/kv/strict_executor.h
@@ -58,11 +58,18 @@ class StrictExecutor : public TransactionManager {
private:
std::unique_ptr<Storage> storage_;
std::unordered_map<std::string, int> lock_map_;
- std::mutex lockMutex_;
- std::priority_queue request_queue_;
+ std::mutex queue_lock_;
+ std::mutex map_lock_;
+ std::mutex response_lock_;
+ std::queue request_queue_;
+ std::condition_variable empty_queue_;
+ std::condition_variable not_empty_queue_;
+ std::condition_variable lock_taken_;
bool is_stop_ = false;
int thread_count_;
+ int request_count_;
vector<thread> thread_list_;
+ std::unique_ptr<BatchUserResponse> batch_response_;
};
} // namespace resdb