This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new add65d98 Added finding wait points code
add65d98 is described below

commit add65d9806f564caacdf92b7aa920eafc5fc84a5
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Wed Feb 7 09:57:26 2024 -0800

    Added finding wait points code
---
 executor/kv/quecc_executor.cpp | 19 +++++++++++++++++++
 executor/kv/quecc_executor.h   |  1 +
 2 files changed, 20 insertions(+)

diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 3a854617..f4bfe5fe 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -64,6 +64,7 @@ QueccExecutor::QueccExecutor(std::unique_ptr<Storage> storage)
     batch_ready_.push_back(false);
     multi_op_batches_.push_back(std::vector<KVRequest>());
     multi_op_number_batches_.push_back(std::vector<int>());
+    wait_point_list_.push_back(std::vector<int>());
     multi_op_ready_.store(false);
 
     std::thread planner(&QueccExecutor::PlannerThread, this, thread_number);
@@ -148,6 +149,24 @@ void QueccExecutor::PlannerThread(const int thread_number) 
{
 
     if(multi_op_ready_.load()){
       //Process multi_op distributions for wait positions
+      vector<KVRequest> multi_op_batch = multi_op_batches_[thread_number];
+      vector<int> multi_op_id = multi_op_number_batches_[thread_number];
+
+      //Look through each KV_Request, if a KV_Request is ever across 2 ranges
+      //it could cause a cascading abort and is therefore recorded as a wait 
point
+      int txn_id=0;
+      set<int> range_used_list = set<int>();
+      for(int i=0; i<multi_op_batch.size(); i++){
+        txn_id=multi_op_id[i];
+        range_used_list.clear();
+        for(const auto& request_operation: multi_op_batch[i].ops()){
+          range_used_list.insert(rid_to_range_.at(request_operation.key()));
+          if(range_used_list.size()>1){
+            wait_point_list_[thread_number].push_back(txn_id);
+            break;
+          }
+        }
+      }
 
       ready_planner_count_.fetch_sub(1);
       if (ready_planner_count_.load() == 0) {
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 786ced11..511c1508 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -102,6 +102,7 @@ class QueccExecutor : public TransactionManager {
   vector<vector<KVRequest>> multi_op_batches_;
   vector<vector<int>> multi_op_number_batches_;
   atomic<bool> multi_op_ready_;
+  vector<vector<int>> wait_point_list_;
 };
 
 }  // namespace resdb

Reply via email to