This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new d891b4ae "Added processing of wait points and verifies"
d891b4ae is described below

commit d891b4aeb7df5332aa5b14c26e9e2300145a8cdf
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Mon Mar 4 09:54:13 2024 -0800

    "Added processing of wait points and verifies"
---
 executor/kv/quecc_executor.cpp | 60 +++++++++++++++++++++++++++++++++++++++++-
 executor/kv/quecc_executor.h   |  1 +
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index f4bfe5fe..d92b3a3b 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -188,13 +188,68 @@ void QueccExecutor::PlannerThread(const int 
thread_number) {
     batch_array_[thread_number].clear();
     const int& range_being_executed = thread_number;
 
+    int wait_point_position=0;
+    int wait_point_vector=0;
+    vector<vector<int>> local_wait_points;
+    for(const auto& waitPointVector : wait_point_list_){
+      local_wait_points.push_back(waitPointVector);
+    }
+
     execute_barrier.arrive_and_wait();
 
     // Executor
     for (int priority = 0; priority < thread_count_; priority++) {
       std::vector<KVOperation>& range_ops =
           this->sorted_transactions_[priority][range_being_executed];
-      for (KVOperation& op : range_ops) {
+      for (int op_count; op_count<range_ops.size(); op_count++) {
+        KVOperation op = range_ops[op_count];
+        //If past wait point, check for next wait point
+        
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
+          wait_point_position++;
+          if(wait_point_position==wait_point_list_[wait_point_vector].size()){
+            wait_point_position=0;
+            wait_point_vector++;
+          }
+        }
+        //If at wait point, first time through, check operations to know if 
transaction is safe
+        //If it succeeds, all subsequent operations in txn can skip this step
+        //If it fails, then it goes into if, sses operations_checked is 
negative, and skips the operation, as txn was aborted
+        
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
 && 
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
+          
if(operations_checked_.find(op.transaction_number)==operations_checked.end()){
+          }
+          if(operations_checked_[op.transaction_number].load()<0){
+            continue;
+          }
+          KVOperation current_op=op;
+          int current_txn_id=op.transaction_number;
+          int new_op_count=op_count;
+          bool same_txn=true;
+          //Verify all operations in txn
+          while(same_txn){ 
+            if(operations_checked_[current_txn_id].load()>=0 && 
current_op.op.cmd() == Operation::SET){   
+              if(!VerifyRequest(current_op.op.key(), current_op.op.value())){
+                //If verify fails, set to -2*op_count so it never becomes 
positive
+                
operations_checked_[current_txn_id].store(-2*transaction_tracker_[current_txn_id]);
+              }
+            }
+            ++operations_checked_[current_txn_id];
+            new_op_count++;
+            current_op=range_ops[new_op_count];
+            //Continues looping through operations until next operation has a 
different txn id
+            if(current_op.transaction_number!=current_txn_id){
+              same_txn=false;
+              new_op_count--;
+            }
+          }
+          //Wait for all threads to finish checking ops from this txn
+          
while(operations_checked_[current_txn_id].load()!=transaction_tracker_[current_txn_id]
 && operations_checked_[current_txn_id].load()>0){}
+          //Check again in case another thread had an abort, if so, skip to 
first operation of next txn id
+          if(operations_checked_[current_txn_id].load()<0){
+            op_count=new_op_count;
+            continue;
+          }
+        }
+        //After all checks are done, can execute operations
         std::unique_ptr<std::string> response = ExecuteData(op);
         if (response == nullptr) {
           response = std::make_unique<std::string>();
@@ -226,6 +281,8 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
   operation_list_.clear();
   key_weight_.clear();
   multi_op_transactions_.clear();
+  wait_point_list_.clear();
+  operations_checked_.clear();
   multi_op_transactions_numbers_.clear();
   total_weight_=0;
 
@@ -286,6 +343,7 @@ std::unique_ptr<BatchUserResponse> 
QueccExecutor::ExecuteBatch(
       }
       transaction_tracker_[txn_id]=1;
     }
+    operations_checked_[txn_id].store(0);
     txn_id++;
   }
 
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 511c1508..1ea2bbab 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -103,6 +103,7 @@ class QueccExecutor : public TransactionManager {
   vector<vector<int>> multi_op_number_batches_;
   atomic<bool> multi_op_ready_;
   vector<vector<int>> wait_point_list_;
+  unordered_map<int, atomic<int>> operations_checked_;
 };
 
 }  // namespace resdb

Reply via email to