This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/QueccBranch by this push:
new d891b4ae "Added processing of wait points and verifies"
d891b4ae is described below
commit d891b4aeb7df5332aa5b14c26e9e2300145a8cdf
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Mon Mar 4 09:54:13 2024 -0800
"Added processing of wait points and verifies"
---
executor/kv/quecc_executor.cpp | 60 +++++++++++++++++++++++++++++++++++++++++-
executor/kv/quecc_executor.h | 1 +
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index f4bfe5fe..d92b3a3b 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -188,13 +188,68 @@ void QueccExecutor::PlannerThread(const int
thread_number) {
batch_array_[thread_number].clear();
const int& range_being_executed = thread_number;
+ int wait_point_position=0;
+ int wait_point_vector=0;
+ vector<vector<int>> local_wait_points;
+ for(const auto& waitPointVector : wait_point_list_){
+ local_wait_points.push_back(waitPointVector);
+ }
+
execute_barrier.arrive_and_wait();
// Executor
for (int priority = 0; priority < thread_count_; priority++) {
std::vector<KVOperation>& range_ops =
this->sorted_transactions_[priority][range_being_executed];
- for (KVOperation& op : range_ops) {
+ for (int op_count; op_count<range_ops.size(); op_count++) {
+ KVOperation op = range_ops[op_count];
+ //If past wait point, check for next wait point
+
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
+ wait_point_position++;
+ if(wait_point_position==wait_point_list_[wait_point_vector].size()){
+ wait_point_position=0;
+ wait_point_vector++;
+ }
+ }
+ //If at wait point, first time through, check operations to know if
transaction is safe
+ //If it succeeds, all subsequent operations in txn can skip this step
+ //If it fails, then it goes into if, sses operations_checked is
negative, and skips the operation, as txn was aborted
+
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
&&
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
+
if(operations_checked_.find(op.transaction_number)==operations_checked.end()){
+ }
+ if(operations_checked_[op.transaction_number].load()<0){
+ continue;
+ }
+ KVOperation current_op=op;
+ int current_txn_id=op.transaction_number;
+ int new_op_count=op_count;
+ bool same_txn=true;
+ //Verify all operations in txn
+ while(same_txn){
+ if(operations_checked_[current_txn_id].load()>=0 &&
current_op.op.cmd() == Operation::SET){
+ if(!VerifyRequest(current_op.op.key(), current_op.op.value())){
+ //If verify fails, set to -2*op_count so it never becomes
positive
+
operations_checked_[current_txn_id].store(-2*transaction_tracker_[current_txn_id]);
+ }
+ }
+ ++operations_checked_[current_txn_id];
+ new_op_count++;
+ current_op=range_ops[new_op_count];
+ //Continues looping through operations until next operation has a
different txn id
+ if(current_op.transaction_number!=current_txn_id){
+ same_txn=false;
+ new_op_count--;
+ }
+ }
+ //Wait for all threads to finish checking ops from this txn
+
while(operations_checked_[current_txn_id].load()!=transaction_tracker_[current_txn_id]
&& operations_checked_[current_txn_id].load()>0){}
+ //Check again in case another thread had an abort, if so, skip to
first operation of next txn id
+ if(operations_checked_[current_txn_id].load()<0){
+ op_count=new_op_count;
+ continue;
+ }
+ }
+ //After all checks are done, can execute operations
std::unique_ptr<std::string> response = ExecuteData(op);
if (response == nullptr) {
response = std::make_unique<std::string>();
@@ -226,6 +281,8 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
operation_list_.clear();
key_weight_.clear();
multi_op_transactions_.clear();
+ wait_point_list_.clear();
+ operations_checked_.clear();
multi_op_transactions_numbers_.clear();
total_weight_=0;
@@ -286,6 +343,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
transaction_tracker_[txn_id]=1;
}
+ operations_checked_[txn_id].store(0);
txn_id++;
}
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 511c1508..1ea2bbab 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -103,6 +103,7 @@ class QueccExecutor : public TransactionManager {
vector<vector<int>> multi_op_number_batches_;
atomic<bool> multi_op_ready_;
vector<vector<int>> wait_point_list_;
+ unordered_map<int, atomic<int>> operations_checked_;
};
} // namespace resdb