This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new e1b1acdf "Split quecc files into no wait and wait variants"
e1b1acdf is described below

commit e1b1acdfaf3d577aa7ae5e31e8a2cd03c963a7c4
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Thu Apr 25 20:46:47 2024 -0700

    "Split quecc files into no wait and wait variants"
---
 .../kv/{quecc_executor.h => quecc_executor wait.h} |   0
 executor/kv/quecc_executor.cpp                     | 146 ++-------------------
 executor/kv/quecc_executor.h                       |   9 +-
 ...{quecc_executor.cpp => quecc_executor_wait.cpp} |  21 +--
 4 files changed, 25 insertions(+), 151 deletions(-)

diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor wait.h
similarity index 100%
copy from executor/kv/quecc_executor.h
copy to executor/kv/quecc_executor wait.h
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 83560fa8..727d895d 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -62,10 +62,6 @@ QueccExecutor::QueccExecutor(std::unique_ptr<Storage> 
storage)
     sorted_transactions_.push_back(
         std::vector<std::vector<KVOperation>>(thread_count_));
     batch_ready_.push_back(false);
-    multi_op_batches_.push_back(std::vector<KVRequest>());
-    multi_op_number_batches_.push_back(std::vector<int>());
-    wait_point_list_.push_back(std::vector<int>());
-    multi_op_ready_.store(false);
 
     std::thread planner(&QueccExecutor::PlannerThread, this, thread_number);
 
@@ -147,35 +143,6 @@ void QueccExecutor::PlannerThread(const int thread_number) 
{
       }
     }
 
-    if(multi_op_ready_.load()){
-      //Process multi_op distributions for wait positions
-      vector<KVRequest> multi_op_batch = multi_op_batches_[thread_number];
-      vector<int> multi_op_id = multi_op_number_batches_[thread_number];
-
-      //Look through each KV_Request, if a KV_Request is ever across 2 ranges
-      //it could cause a cascading abort and is therefore recorded as a wait 
point
-      int txn_id=0;
-      set<int> range_used_list = set<int>();
-      for(size_t i=0; i<multi_op_batch.size(); i++){
-        txn_id=multi_op_id[i];
-        range_used_list.clear();
-        for(const auto& request_operation: multi_op_batch[i].ops()){
-          range_used_list.insert(rid_to_range_.at(request_operation.key()));
-          if(range_used_list.size()>1){
-            wait_point_list_[thread_number].push_back(txn_id);
-            break;
-          }
-        }
-      }
-
-      ready_planner_count_.fetch_sub(1);
-      if (ready_planner_count_.load() == 0) {
-        cv2_.notify_all();
-      }
-      batch_ready_[thread_number] = false;
-      continue;
-    }
-
     // Planner
     std::vector<KVOperation> batch =
         std::move(batch_array_[thread_number]);
@@ -188,68 +155,13 @@ void QueccExecutor::PlannerThread(const int 
thread_number) {
     batch_array_[thread_number].clear();
     const int& range_being_executed = thread_number;
 
-    int wait_point_position=0;
-    int wait_point_vector=0;
-    vector<vector<int>> local_wait_points;
-    for(const auto& waitPointVector : wait_point_list_){
-      local_wait_points.push_back(waitPointVector);
-    }
-
     execute_barrier.arrive_and_wait();
 
     // Executor
     for (int priority = 0; priority < thread_count_; priority++) {
       std::vector<KVOperation>& range_ops =
           this->sorted_transactions_[priority][range_being_executed];
-      for (size_t op_count=0; op_count<range_ops.size(); op_count++) {
-        KVOperation op = range_ops[op_count];
-        //If past wait point, check for next wait point
-        
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
-          wait_point_position++;
-          
if(wait_point_position==(int)wait_point_list_[wait_point_vector].size()){
-            wait_point_position=0;
-            wait_point_vector++;
-          }
-        }
-        //If at wait point, first time through, check operations to know if 
transaction is safe
-        //If it succeeds, all subsequent operations in txn can skip this step
-        //If it fails, then it goes into if, sses operations_checked is 
negative, and skips the operation, as txn was aborted
-        
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
 && 
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
-          
if(operations_checked_.find(op.transaction_number)==operations_checked_.end()){
-          }
-          if(operations_checked_[op.transaction_number].load()<0){
-            continue;
-          }
-          KVOperation current_op=op;
-          int current_txn_id=op.transaction_number;
-          int new_op_count=op_count;
-          bool same_txn=true;
-          //Verify all operations in txn
-          while(same_txn){ 
-            if(operations_checked_[current_txn_id].load()>=0 && 
current_op.op.cmd() == Operation::SET){   
-              if(!VerifyRequest(current_op.op.key(), current_op.op.value())){
-                //If verify fails, set to -2*op_count so it never becomes 
positive
-                
operations_checked_[current_txn_id].store(-2*transaction_tracker_[current_txn_id]);
-              }
-            }
-            ++operations_checked_[current_txn_id];
-            new_op_count++;
-            current_op=range_ops[new_op_count];
-            //Continues looping through operations until next operation has a 
different txn id
-            if(current_op.transaction_number!=current_txn_id){
-              same_txn=false;
-              new_op_count--;
-            }
-          }
-          //Wait for all threads to finish checking ops from this txn
-          
while(operations_checked_[current_txn_id].load()!=transaction_tracker_[current_txn_id]
 && operations_checked_[current_txn_id].load()>0){}
-          //Check again in case another thread had an abort, if so, skip to 
first operation of next txn id
-          if(operations_checked_[current_txn_id].load()<0){
-            op_count=new_op_count;
-            continue;
-          }
-        }
-        //After all checks are done, can execute operations
+      for (KVOperation& op : range_ops) {
         std::unique_ptr<std::string> response = ExecuteData(op);
         if (response == nullptr) {
           response = std::make_unique<std::string>();
@@ -280,10 +192,6 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   transaction_tracker_.clear();
   operation_list_.clear();
   key_weight_.clear();
-  multi_op_transactions_.clear();
-  wait_point_list_.clear();
-  operations_checked_.clear();
-  multi_op_transactions_numbers_.clear();
   total_weight_=0;
 
   for (int i = 0; i < (int)batch_array_.size(); i++) {
@@ -302,10 +210,6 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     // printf("txn id: %lu\n", kv_request.txn_id());
     // printf("kv_request size: %d\n", kv_request.ops_size());    
     if (kv_request.ops_size()) {
-      if(kv_request.ops_size()>1){
-        multi_op_transactions_.push_back(kv_request);
-        multi_op_transactions_numbers_.push_back(txn_id);
-      }
       transaction_tracker_[txn_id] = kv_request.ops_size();
       for(const auto& op : kv_request.ops()){
         KVOperation newOp;
@@ -343,7 +247,6 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       }
       transaction_tracker_[txn_id]=1;
     }
-    operations_checked_[txn_id].store(0);
     txn_id++;
   }
 
@@ -356,42 +259,17 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       batch_number++;
     }
   }
-  CreateRanges();
-
-  int multi_op_split_size = 
multi_op_transactions_numbers_.size()/thread_count_+1;
-  int count_per_split=0;
-  int op_batch_number=0;
-  //Send multi operation transactions to thread to gauge if they require waits
-  for(size_t i=0; i<multi_op_transactions_numbers_.size(); i++){
-    multi_op_ready_.store(true);
-    multi_op_batches_[op_batch_number].push_back(multi_op_transactions_[i]);
-    
multi_op_number_batches_[op_batch_number].push_back(multi_op_transactions_numbers_[i]);
-    count_per_split++;
-    if(count_per_split>multi_op_split_size){
-      count_per_split=0;
-      op_batch_number++;
-    }
-  }
-
-  if(multi_op_ready_.load()){
-    ready_planner_count_.fetch_add(thread_count_);
-    // Allows planner threads to start consuming
-    for (int i = 0; i < thread_count_; i++) {
-      batch_ready_[i] = true;
-    }
-    cv_.notify_all();
-
-    // Wait for threads to finish to get batch response
-    while (ready_planner_count_.load() != 0) {
-      std::unique_lock<std::mutex> lk2(mutex2_);
-      if (cv2_.wait_for(lk2, std::chrono::microseconds(100),
-                        [this] { return is_stop_; })) {
-        return std::move(this->batch_response_);
-      }
-    }
-    multi_op_ready_.store(false);
+/*
+  // RIDs in hash map are now equal to which range they go into
+  int range_count = 0;
+  int range_size = ((rid_to_range_.size() - 1) / thread_count_) + 1;
+  for (const auto& key : rid_to_range_) {
+    rid_to_range_[key.first] = range_count / range_size;
+    range_count++;
   }
-
+*/
+  CreateRanges();
+  
   ready_planner_count_.fetch_add(thread_count_);
   // Allows planner threads to start consuming
   for (int i = 0; i < thread_count_; i++) {
@@ -488,4 +366,4 @@ std::string QueccExecutor::GetRange(const std::string& 
min_key,
   return storage_->GetRange(min_key, max_key);
 }
 
-}  // namespace resdb
+}  // namespace resdb
\ No newline at end of file
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 1ea2bbab..8f18e9b5 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -97,13 +97,6 @@ class QueccExecutor : public TransactionManager {
   atomic<int> ready_planner_count_;
   std::unique_ptr<BatchUserResponse> batch_response_;
   std::unique_ptr<Storage> storage_;
-  vector<KVRequest> multi_op_transactions_;
-  vector<int> multi_op_transactions_numbers_;
-  vector<vector<KVRequest>> multi_op_batches_;
-  vector<vector<int>> multi_op_number_batches_;
-  atomic<bool> multi_op_ready_;
-  vector<vector<int>> wait_point_list_;
-  unordered_map<int, atomic<int>> operations_checked_;
 };
 
-}  // namespace resdb
+}  // namespace resdb
\ No newline at end of file
diff --git a/executor/kv/quecc_executor.cpp 
b/executor/kv/quecc_executor_wait.cpp
similarity index 94%
copy from executor/kv/quecc_executor.cpp
copy to executor/kv/quecc_executor_wait.cpp
index 83560fa8..9cae2bc9 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor_wait.cpp
@@ -151,7 +151,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
       //Process multi_op distributions for wait positions
       vector<KVRequest> multi_op_batch = multi_op_batches_[thread_number];
       vector<int> multi_op_id = multi_op_number_batches_[thread_number];
-
+      std::cout<<"Hello 2.5 "<< multi_op_batch.size() << std::endl;
       //Look through each KV_Request, if a KV_Request is ever across 2 ranges
       //it could cause a cascading abort and is therefore recorded as a wait 
point
       int txn_id=0;
@@ -162,6 +162,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
         for(const auto& request_operation: multi_op_batch[i].ops()){
           range_used_list.insert(rid_to_range_.at(request_operation.key()));
           if(range_used_list.size()>1){
+            std::cout<<"Hello 5"<<std::endl;
             wait_point_list_[thread_number].push_back(txn_id);
             break;
           }
@@ -194,7 +195,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
     for(const auto& waitPointVector : wait_point_list_){
       local_wait_points.push_back(waitPointVector);
     }
-
+    std::cout<<"Hello 6"<<std::endl;
     execute_barrier.arrive_and_wait();
 
     // Executor
@@ -203,18 +204,19 @@ void QueccExecutor::PlannerThread(const int 
thread_number) {
           this->sorted_transactions_[priority][range_being_executed];
       for (size_t op_count=0; op_count<range_ops.size(); op_count++) {
         KVOperation op = range_ops[op_count];
-        //If past wait point, check for next wait point
-        
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
+          //If past wait point, check for next wait point
+        while(wait_point_vector<wait_point_list_.size() && 
op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
           wait_point_position++;
           
if(wait_point_position==(int)wait_point_list_[wait_point_vector].size()){
             wait_point_position=0;
             wait_point_vector++;
           }
         }
+        //std::cout<<"Hello 7"<<std::endl;
         //If at wait point, first time through, check operations to know if 
transaction is safe
         //If it succeeds, all subsequent operations in txn can skip this step
         //If it fails, then it goes into if, sses operations_checked is 
negative, and skips the operation, as txn was aborted
-        
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
 && 
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
+        if(wait_point_vector<wait_point_list_.size() && 
op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position] 
&& 
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
           
if(operations_checked_.find(op.transaction_number)==operations_checked_.end()){
           }
           if(operations_checked_[op.transaction_number].load()<0){
@@ -275,6 +277,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
 
 std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
     const BatchUserRequest& request) {
+  std::cout<<"Hello"<<std::endl;
   this->batch_response_ = std::make_unique<BatchUserResponse>();
   rid_to_range_.clear();
   transaction_tracker_.clear();
@@ -300,7 +303,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     }
 
     // printf("txn id: %lu\n", kv_request.txn_id());
-    // printf("kv_request size: %d\n", kv_request.ops_size());    
+    //printf("kv_request size: %d\n", kv_request.ops_size());    
     if (kv_request.ops_size()) {
       if(kv_request.ops_size()>1){
         multi_op_transactions_.push_back(kv_request);
@@ -346,7 +349,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     operations_checked_[txn_id].store(0);
     txn_id++;
   }
-
+  std::cout<<"Hello 2 "<<operation_list_.size() << " "<< 
multi_op_transactions_.size()<<std::endl;
   int planner_vector_size =
       (operation_list_.size() + thread_count_ - 1) / thread_count_;
   for (const auto& op : operation_list_) {
@@ -372,7 +375,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       op_batch_number++;
     }
   }
-
+  std::cout<<"Hello 3 "<< multi_op_batches_[0].size()<<std::endl;
   if(multi_op_ready_.load()){
     ready_planner_count_.fetch_add(thread_count_);
     // Allows planner threads to start consuming
@@ -391,7 +394,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
     }
     multi_op_ready_.store(false);
   }
-
+  std::cout<<"Hello 4 "<<std::endl;
   ready_planner_count_.fetch_add(thread_count_);
   // Allows planner threads to start consuming
   for (int i = 0; i < thread_count_; i++) {

Reply via email to