This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new 27cf36d2 "Added LevelDB back to benchmark for more accurate comparing"
27cf36d2 is described below

commit 27cf36d21d37a352a78da2c46ce32a21f7f340f8
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Sat Apr 27 14:34:21 2024 -0700

    "Added LevelDB back to benchmark for more accurate comparing"
---
 benchmark/protocols/quecc/BUILD               |  1 +
 benchmark/protocols/quecc/quecc_benchmark.cpp | 11 +++-
 executor/kv/kv_executor.cpp                   | 93 ++++++++++++++++++++-------
 executor/kv/kv_executor.h                     |  2 +
 executor/kv/quecc_executor.cpp                | 31 +++++++--
 5 files changed, 106 insertions(+), 32 deletions(-)

diff --git a/benchmark/protocols/quecc/BUILD b/benchmark/protocols/quecc/BUILD
index b268760e..65c627b1 100644
--- a/benchmark/protocols/quecc/BUILD
+++ b/benchmark/protocols/quecc/BUILD
@@ -12,5 +12,6 @@ cc_binary(
         "//platform/config:resdb_config_utils",
         "//proto/kv:kv_cc_proto",
         "//chain/storage:memory_db",
+        "//chain/storage:leveldb"
     ],
 )
diff --git a/benchmark/protocols/quecc/quecc_benchmark.cpp 
b/benchmark/protocols/quecc/quecc_benchmark.cpp
index ebe7e696..5e9f4fe8 100644
--- a/benchmark/protocols/quecc/quecc_benchmark.cpp
+++ b/benchmark/protocols/quecc/quecc_benchmark.cpp
@@ -28,6 +28,7 @@
 #include <chrono>
 #include <cstdint>
 #include <ctime>
+#include "chain/storage/leveldb.h"
 
 #include "chain/storage/memory_db.h"
 #include "executor/kv/kv_executor.h"
@@ -43,6 +44,7 @@ using resdb::KVExecutor;
 using resdb::KVOperation;
 using resdb::KVRequest;
 using resdb::storage::MemoryDB;
+using resdb::storage::NewResLevelDB;
 using resdb::QueccExecutor;
 using resdb::ResConfigData;
 using resdb::ResDBConfig;
@@ -128,6 +130,8 @@ BatchUserRequest RandomDistribution() {
 }
 
 int main(int argc, char** argv) {
+  //std::unique_ptr<Storage> storage = NewResLevelDB("/tmp/leveldb_test");
+
   vector<BatchUserRequest> equal_split_array;
   vector<BatchUserRequest> no_split_array;
   vector<BatchUserRequest> random_split_array;
@@ -136,9 +140,10 @@ int main(int argc, char** argv) {
     // no_split_array.push_back(NoDistribution());
     // random_split_array.push_back(RandomDistribution());
   }
-  KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
-
-  QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
+  //KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
+  KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test"));
+  //QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
+  QueccExecutor 
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test"));
 
   std::unique_ptr<BatchUserResponse> response;
   // Equal Split Comparison
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index d63c6dfa..f89e0a2c 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -35,31 +35,61 @@ std::unique_ptr<std::string> KVExecutor::ExecuteData(
     LOG(ERROR) << "parse data fail";
     return nullptr;
   }
-
-  if (kv_request.cmd() == Operation::SET) {
-    Set(kv_request.key(), kv_request.value());
-  } else if (kv_request.cmd() == Operation::GET) {
-    kv_response.set_value(Get(kv_request.key()));
-  } else if (kv_request.cmd() == Operation::GETALLVALUES) {
-    kv_response.set_value(GetAllValues());
-  } else if (kv_request.cmd() == Operation::GETRANGE) {
-    kv_response.set_value(GetRange(kv_request.key(), kv_request.value()));
-  } else if (kv_request.cmd() == Operation::SET_WITH_VERSION) {
-    SetWithVersion(kv_request.key(), kv_request.value(), kv_request.version());
-  } else if (kv_request.cmd() == Operation::GET_WITH_VERSION) {
-    GetWithVersion(kv_request.key(), kv_request.version(),
-                   kv_response.mutable_value_info());
-  } else if (kv_request.cmd() == Operation::GET_ALL_ITEMS) {
-    GetAllItems(kv_response.mutable_items());
-  } else if (kv_request.cmd() == Operation::GET_KEY_RANGE) {
-    GetKeyRange(kv_request.min_key(), kv_request.max_key(),
-                kv_response.mutable_items());
-  } else if (kv_request.cmd() == Operation::GET_HISTORY) {
-    GetHistory(kv_request.key(), kv_request.min_version(),
-               kv_request.max_version(), kv_response.mutable_items());
-  } else if (kv_request.cmd() == Operation::GET_TOP) {
-    GetTopHistory(kv_request.key(), kv_request.top_number(),
+  if (kv_request.ops_size()) {
+    for(const auto& op : kv_request.ops()){
+      if (op.cmd() == Operation::SET) {
+        Set(op.key(), op.value());
+      } else if (op.cmd() == Operation::GET) {
+        kv_response.set_value(Get(op.key()));
+      } else if (op.cmd() == Operation::GETALLVALUES) {
+        kv_response.set_value(GetAllValues());
+      } else if (op.cmd() == Operation::GETRANGE) {
+        kv_response.set_value(GetRange(op.key(), op.value()));
+      } else if (op.cmd() == Operation::SET_WITH_VERSION) {
+        SetWithVersion(op.key(), op.value(), kv_request.version());
+      } else if (op.cmd() == Operation::GET_WITH_VERSION) {
+        GetWithVersion(op.key(), kv_request.version(),
+                      kv_response.mutable_value_info());
+      } else if (op.cmd() == Operation::GET_ALL_ITEMS) {
+        GetAllItems(kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_KEY_RANGE) {
+        GetKeyRange(kv_request.min_key(), kv_request.max_key(),
+                    kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_HISTORY) {
+        GetHistory(op.key(), kv_request.min_version(),
+                  kv_request.max_version(), kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_TOP) {
+        GetTopHistory(op.key(), kv_request.top_number(),
+                      kv_response.mutable_items());
+      }
+    }
+  }
+  else{
+    if (kv_request.cmd() == Operation::SET) {
+      Set(kv_request.key(), kv_request.value());
+    } else if (kv_request.cmd() == Operation::GET) {
+      kv_response.set_value(Get(kv_request.key()));
+    } else if (kv_request.cmd() == Operation::GETALLVALUES) {
+      kv_response.set_value(GetAllValues());
+    } else if (kv_request.cmd() == Operation::GETRANGE) {
+      kv_response.set_value(GetRange(kv_request.key(), kv_request.value()));
+    } else if (kv_request.cmd() == Operation::SET_WITH_VERSION) {
+      SetWithVersion(kv_request.key(), kv_request.value(), 
kv_request.version());
+    } else if (kv_request.cmd() == Operation::GET_WITH_VERSION) {
+      GetWithVersion(kv_request.key(), kv_request.version(),
+                    kv_response.mutable_value_info());
+    } else if (kv_request.cmd() == Operation::GET_ALL_ITEMS) {
+      GetAllItems(kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_KEY_RANGE) {
+      GetKeyRange(kv_request.min_key(), kv_request.max_key(),
                   kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_HISTORY) {
+      GetHistory(kv_request.key(), kv_request.min_version(),
+                kv_request.max_version(), kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_TOP) {
+      GetTopHistory(kv_request.key(), kv_request.top_number(),
+                    kv_response.mutable_items());
+    }
   }
 
   std::unique_ptr<std::string> resp_str = std::make_unique<std::string>();
@@ -143,5 +173,20 @@ void KVExecutor::GetTopHistory(const std::string& key, int 
top_number,
     item->mutable_value_info()->set_version(it.second);
   }
 }
+std::unique_ptr<BatchUserResponse> KVExecutor::ExecuteBatch(
+    const BatchUserRequest& request) {
+  std::unique_ptr<BatchUserResponse> batch_response =
+      std::make_unique<BatchUserResponse>();
+  for (auto& sub_request : request.user_requests()) {
+    std::unique_ptr<std::string> response =
+        ExecuteData(sub_request.request().data());
+    if (response == nullptr) {
+      response = std::make_unique<std::string>();
+    }
+    batch_response->add_response()->swap(*response);
+  }
+
+  return batch_response;
+}
 
 }  // namespace resdb
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index 0fda88ae..3a078995 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -35,6 +35,8 @@ class KVExecutor : public TransactionManager {
   virtual ~KVExecutor() = default;
 
   std::unique_ptr<std::string> ExecuteData(const std::string& request) 
override;
+  std::unique_ptr<BatchUserResponse> ExecuteBatch(
+      const BatchUserRequest& request) override;
 
  protected:
   virtual void Set(const std::string& key, const std::string& value);
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 727d895d..ce11ce7b 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -46,14 +46,14 @@
 using namespace std;
 using resdb::KVRequest;
 // 4 is hardcoded for thread count for now, eventually will use config value
-barrier execute_barrier(5);
+barrier execute_barrier(4);
 mutex results_mutex;
 
 namespace resdb {
 
 QueccExecutor::QueccExecutor(std::unique_ptr<Storage> storage)
     : storage_(std::move(storage)) {
-  thread_count_ = 5;
+  thread_count_ = 4;
   atomic<int> ready_planner_count_(0);
   std::unique_ptr<BatchUserResponse> batch_output =
       std::make_unique<BatchUserResponse>();
@@ -156,6 +156,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
     const int& range_being_executed = thread_number;
 
     execute_barrier.arrive_and_wait();
+    auto start_time = std::chrono::high_resolution_clock::now();
 
     // Executor
     for (int priority = 0; priority < thread_count_; priority++) {
@@ -170,6 +171,10 @@ void QueccExecutor::PlannerThread(const int thread_number) 
{
       }
       this->sorted_transactions_[priority][range_being_executed].clear();
     }
+    auto end_time = std::chrono::high_resolution_clock::now();
+    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+        end_time - start_time);
+    //printf("Time Taken 5: %d\n", (int)duration.count());
 
     // Lock used to minimize conflicts of adding to batchresponse
     results_mutex.lock();
@@ -187,6 +192,8 @@ void QueccExecutor::PlannerThread(const int thread_number) {
 
 std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
     const BatchUserRequest& request) {
+  auto start_time = std::chrono::high_resolution_clock::now();
+
   this->batch_response_ = std::make_unique<BatchUserResponse>();
   rid_to_range_.clear();
   transaction_tracker_.clear();
@@ -206,7 +213,6 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       LOG(ERROR) << "parse data fail";
       return std::move(this->batch_response_);
     }
-
     // printf("txn id: %lu\n", kv_request.txn_id());
     // printf("kv_request size: %d\n", kv_request.ops_size());    
     if (kv_request.ops_size()) {
@@ -215,7 +221,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
         KVOperation newOp;
         newOp.transaction_number=txn_id;
         newOp.op= op;
-        operation_list_.push_back(newOp);
+        operation_list_.push_back(std::move(newOp));
         if(!key_weight_.count(op.key())){
           key_weight_[op.key()]=0;
         }
@@ -235,7 +241,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       newOp.op.set_cmd(kv_request.cmd());
       newOp.op.set_key(kv_request.key());
       newOp.op.set_value(kv_request.value());
-      operation_list_.push_back(newOp);
+      operation_list_.push_back(std::move(newOp));
 
       if(kv_request.cmd() == Operation::SET){
         key_weight_[kv_request.key()]=key_weight_[kv_request.key()]+5;
@@ -248,7 +254,15 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       transaction_tracker_[txn_id]=1;
     }
     txn_id++;
+    auto end_time = std::chrono::high_resolution_clock::now();
+    auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+        end_time - start_time);
+    //printf("Time Taken 5: %d\n", (int)duration.count());
   }
+  auto end_time = std::chrono::high_resolution_clock::now();
+  auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+      end_time - start_time);
+  printf("Time Taken 1: %d\n", (int)duration.count());
 
   int planner_vector_size =
       (operation_list_.size() + thread_count_ - 1) / thread_count_;
@@ -269,6 +283,10 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   }
 */
   CreateRanges();
+  end_time = std::chrono::high_resolution_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::microseconds>(
+      end_time - start_time);
+  printf("Time Taken 2: %d\n", (int)duration.count());
   
   ready_planner_count_.fetch_add(thread_count_);
   // Allows planner threads to start consuming
@@ -276,6 +294,9 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     batch_ready_[i] = true;
   }
   cv_.notify_all();
+  end_time = std::chrono::high_resolution_clock::now();
+  duration = std::chrono::duration_cast<std::chrono::microseconds>(
+      end_time - start_time);
 
   // Wait for threads to finish to get batch response
   while (ready_planner_count_.load() != 0) {

Reply via email to