This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new a7596f8a "Fixed benchmark issues, added new experiments"
a7596f8a is described below

commit a7596f8ae9a39baef0f11c1445b4e8f5adb4e705
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Wed May 22 17:19:36 2024 -0700

    "Fixed benchmark issues, added new experiments"
---
 benchmark/protocols/quecc/BUILD               |  4 +-
 benchmark/protocols/quecc/quecc_benchmark.cpp | 38 ++++++++++----
 chain/storage/leveldb.cpp                     | 15 +++++-
 executor/kv/BUILD                             | 13 +++++
 executor/kv/quecc_executor.cpp                | 27 ----------
 executor/kv/strict_executor.cpp               | 71 ++++++++++++++-------------
 executor/kv/strict_executor.h                 | 19 +++++--
 7 files changed, 111 insertions(+), 76 deletions(-)

diff --git a/benchmark/protocols/quecc/BUILD b/benchmark/protocols/quecc/BUILD
index 65c627b1..c0d20ecc 100644
--- a/benchmark/protocols/quecc/BUILD
+++ b/benchmark/protocols/quecc/BUILD
@@ -9,9 +9,11 @@ cc_binary(
         "//chain/state:chain_state",
         "//executor/kv:kv_executor",
         "//executor/kv:quecc_executor",
+        "//executor/kv:strict_executor",
         "//platform/config:resdb_config_utils",
         "//proto/kv:kv_cc_proto",
         "//chain/storage:memory_db",
-        "//chain/storage:leveldb"
+        "//chain/storage:leveldb",
+        "//chain/storage:rocksdb"
     ],
 )
diff --git a/benchmark/protocols/quecc/quecc_benchmark.cpp 
b/benchmark/protocols/quecc/quecc_benchmark.cpp
index 5e9f4fe8..6a6d5209 100644
--- a/benchmark/protocols/quecc/quecc_benchmark.cpp
+++ b/benchmark/protocols/quecc/quecc_benchmark.cpp
@@ -29,10 +29,12 @@
 #include <cstdint>
 #include <ctime>
 #include "chain/storage/leveldb.h"
+#include "chain/storage/rocksdb.h"
 
 #include "chain/storage/memory_db.h"
 #include "executor/kv/kv_executor.h"
 #include "executor/kv/quecc_executor.h"
+#include "executor/kv/strict_executor.h"
 #include "platform/config/resdb_config_utils.h"
 #include "platform/proto/resdb.pb.h"
 #include "proto/kv/kv.pb.h"
@@ -45,7 +47,9 @@ using resdb::KVOperation;
 using resdb::KVRequest;
 using resdb::storage::MemoryDB;
 using resdb::storage::NewResLevelDB;
+using resdb::storage::NewResRocksDB;
 using resdb::QueccExecutor;
+using resdb::StrictExecutor;
 using resdb::ResConfigData;
 using resdb::ResDBConfig;
 using resdb::Storage;
@@ -63,7 +67,7 @@ BatchUserRequest EqualDistribution() {
     // add transaction
     KVRequest request;
 
-    for (int j = 0; j < 100; j++) {
+    for (int j = 0; j < 1000; j++) {
       // add operation
       resdb::Operation* op = request.add_ops();
       op->set_cmd(resdb::Operation::SET);
@@ -141,15 +145,20 @@ int main(int argc, char** argv) {
     // random_split_array.push_back(RandomDistribution());
   }
   //KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
-  KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test"));
+  //KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test1"));
+  
   //QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
-  QueccExecutor 
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test"));
+  //QueccExecutor 
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test7", std::nullopt));
+
+   //StrictExecutor 
strict_executor=StrictExecutor(std::make_unique<MemoryDB>());
+  StrictExecutor 
strict_executor=StrictExecutor(NewResLevelDB("/tmp/leveldb_test8", 
std::nullopt));
+  std::cout<<"Hello 2"<<std::endl;
 
   std::unique_ptr<BatchUserResponse> response;
   // Equal Split Comparison
   printf("Equal Split Times\n");
 
-  auto start_time = std::chrono::high_resolution_clock::now();
+  /*auto start_time = std::chrono::high_resolution_clock::now();
   for (BatchUserRequest equal_split : equal_split_array) {
     response = quecc_executor.ExecuteBatch(equal_split);
   }
@@ -157,14 +166,25 @@ int main(int argc, char** argv) {
   auto end_time = std::chrono::high_resolution_clock::now();
   auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
       end_time - start_time);
-  printf("Quecc Time Taken: %d\n", (int)duration.count());
-  start_time = std::chrono::high_resolution_clock::now();
+  printf("Quecc Time Taken: %d\n", (int)duration.count());*/
+
+  /*auto start_time = std::chrono::high_resolution_clock::now();
 
   for (BatchUserRequest equal_split : equal_split_array) {
     response = kv_executor.ExecuteBatch(equal_split);
   }
-  end_time = std::chrono::high_resolution_clock::now();
-  duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time -
+  auto end_time = std::chrono::high_resolution_clock::now();
+  auto duration = 
std::chrono::duration_cast<std::chrono::microseconds>(end_time -
                                                                    start_time);
-  printf("KV Time Taken: %d\n", (int)duration.count());
+  printf("KV Time Taken: %d\n", (int)duration.count());*/
+
+  auto start_time = std::chrono::high_resolution_clock::now();
+  for (BatchUserRequest equal_split : equal_split_array) {
+    response = strict_executor.ExecuteBatch(equal_split);
+  }
+
+  auto end_time = std::chrono::high_resolution_clock::now();
+  auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+      end_time - start_time);
+  printf("2PL Time Taken: %d\n", (int)duration.count());
 }
\ No newline at end of file
diff --git a/chain/storage/leveldb.cpp b/chain/storage/leveldb.cpp
index 2cc96fb5..6fabbea7 100644
--- a/chain/storage/leveldb.cpp
+++ b/chain/storage/leveldb.cpp
@@ -23,6 +23,8 @@
 
 #include "chain/storage/proto/kv.pb.h"
 
+std::mutex batch_mutex;
+
 namespace resdb {
 namespace storage {
 
@@ -77,18 +79,29 @@ ResLevelDB::~ResLevelDB() {
 }
 
 int ResLevelDB::SetValue(const std::string& key, const std::string& value) {
+  leveldb::WriteBatch localBatch;
+  batch_mutex.lock();
   batch_.Put(key, value);
 
   if (batch_.ApproximateSize() >= write_batch_size_) {
-    leveldb::Status status = db_->Write(leveldb::WriteOptions(), &batch_);
+    localBatch.Append(batch_);
+    //batch_.Clear();
+    //batch_mutex.unlock();
+    leveldb::Status status = db_->Write(leveldb::WriteOptions(), &localBatch);
     if (status.ok()) {
+      //batch_mutex.lock();
       batch_.Clear();
+      batch_mutex.unlock();
       return 0;
     } else {
       LOG(ERROR) << "flush buffer fail:" << status.ToString();
+      //batch_mutex.lock();
+      batch_.Append(localBatch);
+      batch_mutex.unlock();
       return -1;
     }
   }
+  batch_mutex.unlock();
   return 0;
 }
 
diff --git a/executor/kv/BUILD b/executor/kv/BUILD
index e05aad22..3005b3e9 100644
--- a/executor/kv/BUILD
+++ b/executor/kv/BUILD
@@ -46,6 +46,19 @@ cc_library(
     ],
 )
 
+cc_library(
+    name = "strict_executor",
+    srcs = ["strict_executor.cpp"],
+    hdrs = ["strict_executor.h"],
+    deps = [
+        "//chain/storage",
+        "//common:comm",
+        "//executor/common:transaction_manager",
+        "//platform/config:resdb_config_utils",
+        "//proto/kv:kv_cc_proto",
+    ],
+)
+
 cc_test(
     name = "kv_executor_test",
     srcs = ["kv_executor_test.cpp"],
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 13c2d23a..a1959a57 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -156,7 +156,6 @@ void QueccExecutor::PlannerThread(const int thread_number) {
     const int& range_being_executed = thread_number;
 
     execute_barrier.arrive_and_wait();
-    auto start_time = std::chrono::high_resolution_clock::now();
 
     // Executor
     for (int priority = 0; priority < thread_count_; priority++) {
@@ -171,10 +170,6 @@ void QueccExecutor::PlannerThread(const int thread_number) 
{
       }
       this->sorted_transactions_[priority][range_being_executed].clear();
     }
-    auto end_time = std::chrono::high_resolution_clock::now();
-    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
-        end_time - start_time);
-    //printf("Time Taken 5: %d\n", (int)duration.count());
 
     // Lock used to minimize conflicts of adding to batchresponse
     results_mutex.lock();
@@ -192,7 +187,6 @@ void QueccExecutor::PlannerThread(const int thread_number) {
 
 std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
     const BatchUserRequest& request) {
-  auto start_time = std::chrono::high_resolution_clock::now();
 
   this->batch_response_ = std::make_unique<BatchUserResponse>();
   rid_to_range_.clear();
@@ -254,15 +248,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       transaction_tracker_[txn_id]=1;
     }
     txn_id++;
-    auto end_time = std::chrono::high_resolution_clock::now();
-    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
-        end_time - start_time);
-    //printf("Time Taken 5: %d\n", (int)duration.count());
   }
-  auto end_time = std::chrono::high_resolution_clock::now();
-  auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
-      end_time - start_time);
-  printf("Time Taken 1: %d\n", (int)duration.count());
 
   int planner_vector_size =
       (operation_list_.size() + thread_count_ - 1) / thread_count_;
@@ -283,20 +269,12 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   }
 */
   CreateRanges();
-  end_time = std::chrono::high_resolution_clock::now();
-  duration = std::chrono::duration_cast<std::chrono::microseconds>(
-      end_time - start_time);
-  printf("Time Taken 2: %d\n", (int)duration.count());
-  
   ready_planner_count_.fetch_add(thread_count_);
   // Allows planner threads to start consuming
   for (int i = 0; i < thread_count_; i++) {
     batch_ready_[i] = true;
   }
   cv_.notify_all();
-  end_time = std::chrono::high_resolution_clock::now();
-  duration = std::chrono::duration_cast<std::chrono::microseconds>(
-      end_time - start_time);
 
   // Wait for threads to finish to get batch response
   while (ready_planner_count_.load() != 0) {
@@ -325,11 +303,6 @@ std::unique_ptr<std::string> 
QueccExecutor::ExecuteData(const KVOperation& oper)
   if (!kv_response.SerializeToString(resp_str.get())) {
     return nullptr;
   }
-  // Add some way to decrement number of KVOperations for the txn in
-  // transaction_tracker here, could have map of int->atomic_int
-  // If a txn has 0 KVOperations remaining, remove it
-  // Any txns left in map by end had some issue/aborted and we know which
-  // txns must be undone
   return resp_str;
 }
 
diff --git a/executor/kv/strict_executor.cpp b/executor/kv/strict_executor.cpp
index b4d73ab2..1d4fac0d 100644
--- a/executor/kv/strict_executor.cpp
+++ b/executor/kv/strict_executor.cpp
@@ -17,25 +17,25 @@
  * under the License.
  */
 
-#include "executor/kv/kv_executor.h"
+#include "executor/kv/strict_executor.h"
 
 #include <glog/logging.h>
-
+using namespace std;
+using resdb::KVRequest;
 namespace resdb {
 
 StrictExecutor::StrictExecutor(std::unique_ptr<Storage> storage)
     : storage_(std::move(storage)) {
       int thread_count_=4;
       for(int i=0; i<thread_count_;i++){
-        std::thread planner(&StrictExecutor::WorkerThread, this, NULL);
+        std::thread worker(&StrictExecutor::WorkerThread, this);
 
         // thread pinning
         cpu_set_t cpuset;
         CPU_ZERO(&cpuset);
-        CPU_SET(thread_number, &cpuset);
-        int status = pthread_setaffinity_np(planner.native_handle(),
-                                            sizeof(cpu_set_t), &cpuset);
-        thread_list_.push_back(move(planner));
+        CPU_SET(i, &cpuset);
+        //int status = 
pthread_setaffinity_np(planner.native_handle(),sizeof(cpu_set_t), &cpuset);
+        thread_list_.push_back(move(worker));
       }
 
     }
@@ -56,22 +56,25 @@ void StrictExecutor::WorkerThread(){
   std::set<std::string> held_locks;
   bool lock_conflict;
   std::string key;
+  std::unique_lock<std::mutex> queue_lock(queue_lock_);
+  std::unique_lock<std::mutex> map_lock(map_lock_);
+  std::unique_lock<std::mutex> response_lock(response_lock_);
   while(true){
     //Wait until request for thread to process is added and grab it
-    queue_lock_.lock();
-    while(request_queue.size()==0){
-      if (not_empty_queue_.wait_for(queue_lock_, 
std::chrono::microseconds(100),
-        [this] { return is_stop_; })) {
+    queue_lock.lock();
+    while(request_queue_.size()==0){
+      if (not_empty_queue_.wait_for(queue_lock, std::chrono::microseconds(100),
+        [&] { return is_stop_; })) {
         queue_lock_.unlock();
         return;
       }
     }
     kv_request=request_queue_.front();
-    request_queue.pop();
-    queue_lock_.unlock();
+    request_queue_.pop();
+    queue_lock.unlock();
 
     //Grab locks needed for request
-    map_lock_.lock();
+    map_lock.lock();
     //If multiple operations in kv_request
     if(kv_request.ops_size()){
       lock_conflict=true;
@@ -91,13 +94,13 @@ void StrictExecutor::WorkerThread(){
           }
         }
         if(lock_conflict){
-          for(auto& key : held_locks_){
+          for(auto& key : held_locks){
             lock_map_[key]=0;
           }
           lock_taken_.notify_all();
           //Put thread to sleep until another thread releases locks or time 
expires
-          if (lock_taken_.wait_for(map_lock_, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
-            map_lock_.unlock();
+          if (lock_taken_.wait_for(map_lock, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+            map_lock.unlock();
             return;
           }
         }
@@ -110,34 +113,34 @@ void StrictExecutor::WorkerThread(){
         held_locks.insert(kv_request.key());
       }
       else{
-        while(lock_map[kv_request.key()]==1){
-          if (lock_taken_.wait_for(map_lock_, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
-            map_lock_.unlock();
+        while(lock_map_[kv_request.key()]==1){
+          if (lock_taken_.wait_for(map_lock, 
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+            map_lock.unlock();
             return;
           }
         }
       }
     }
-    map_lock_.unlock();
+    map_lock.unlock();
 
     //Once all locks are grabbed, execute transaction
     std::unique_ptr<std::string> response =
-        ExecuteData(kv_request);
+      ExecuteData(kv_request);
     if (response == nullptr) {
       response = std::make_unique<std::string>();
     }
 
     //Release locks that were grabbed
-    map_lock_.lock();
-    for(auto& key : held_locks_){
+    map_lock.lock();
+    for(auto& key : held_locks){
       lock_map_[key]=0;
     }
-    map_lock_.unlock();
+    map_lock.unlock();
     lock_taken_.notify_all();
 
     //Add response
     response_lock_.lock();
-    this->batch_response->add_response()->swap(*response);
+    this->batch_response_->add_response()->swap(*response);
     //Need to check all threads are done processing, and if all threads are 
done + queue is empty, wake main thread
     //Maybe check number of responses in batch response, if equals number of 
requests, all requests are processed    
     if(this->batch_response_.get()->response_size()==request_count_){
@@ -151,28 +154,30 @@ std::unique_ptr<BatchUserResponse> 
StrictExecutor::ExecuteBatch(
     const BatchUserRequest& request) {
   this->batch_response_ = std::make_unique<BatchUserResponse>();
   request_count_=0;
+  std::unique_lock<std::mutex> queue_lock(queue_lock_);
   for (const auto& sub_request : request.user_requests()) {
     KVRequest kv_request;
     if (!kv_request.ParseFromString(sub_request.request().data())) {
       LOG(ERROR) << "parse data fail";
       return std::move(this->batch_response_);
     }
-    queue_lock_.lock();
+    queue_lock.lock();
     request_queue_.push(std::move(kv_request));
     request_count_++;
-    queue_lock_.unlock();
+    queue_lock.unlock();
     //Might want to move this/add new condition to avoid unecessary broadcasts
-    not_empty_queue_.notify_all():
+    not_empty_queue_.notify_all();
   }
-  response_lock_.lock();
+  std::unique_lock<std::mutex> response_lock(response_lock_);
+  response_lock.lock();
   while(this->batch_response_.get()->response_size()!=request_count_){
-    if (empty_queue_.wait_for(queue_lock_, std::chrono::microseconds(100),
+    if (empty_queue_.wait_for(queue_lock, std::chrono::microseconds(100),
         [this] { return is_stop_; })) {
-        response_lock_.unlock();
+        response_lock.unlock();
       return std::move(this->batch_response_);
     }
   }
-  response_lock_.unlock();
+  response_lock.unlock();
   return std::move(this->batch_response_);
 }
 
diff --git a/executor/kv/strict_executor.h b/executor/kv/strict_executor.h
index 1cefe09b..c973235f 100644
--- a/executor/kv/strict_executor.h
+++ b/executor/kv/strict_executor.h
@@ -21,20 +21,29 @@
 
 #include <map>
 #include <optional>
+#include <atomic>
+#include <barrier>
+#include <list>
+#include <map>
+#include <memory>
+#include <optional>
+#include <thread>
 #include <unordered_map>
-
+#include <vector>
+#include <queue>
+#include <condition_variable>
 #include "chain/storage/storage.h"
 #include "executor/common/transaction_manager.h"
 #include "proto/kv/kv.pb.h"
-
+using namespace std;
 namespace resdb {
 
 class StrictExecutor : public TransactionManager {
  public:
   StrictExecutor(std::unique_ptr<Storage> storage);
-  virtual ~StrictExecutor() = default;
+  virtual ~StrictExecutor();
 
-  std::unique_ptr<std::string> ExecuteData(const std::string& request) 
override;
+  std::unique_ptr<std::string> ExecuteData(const KVRequest& kv_request);
   std::unique_ptr<BatchUserResponse> ExecuteBatch(
       const BatchUserRequest& request) override;
 
@@ -61,7 +70,7 @@ class StrictExecutor : public TransactionManager {
   std::mutex queue_lock_;
   std::mutex map_lock_;
   std::mutex response_lock_;
-  std::queue request_queue_;
+  std::queue<KVRequest> request_queue_;
   std::condition_variable empty_queue_;
   std::condition_variable not_empty_queue_;
   std::condition_variable lock_taken_;

Reply via email to