This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/QueccBranch by this push:
new e1b1acdf "Split quecc files into no wait and wait variants"
e1b1acdf is described below
commit e1b1acdfaf3d577aa7ae5e31e8a2cd03c963a7c4
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Thu Apr 25 20:46:47 2024 -0700
"Split quecc files into no wait and wait variants"
---
.../kv/{quecc_executor.h => quecc_executor wait.h} | 0
executor/kv/quecc_executor.cpp | 146 ++-------------------
executor/kv/quecc_executor.h | 9 +-
...{quecc_executor.cpp => quecc_executor_wait.cpp} | 21 +--
4 files changed, 25 insertions(+), 151 deletions(-)
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor wait.h
similarity index 100%
copy from executor/kv/quecc_executor.h
copy to executor/kv/quecc_executor wait.h
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 83560fa8..727d895d 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -62,10 +62,6 @@ QueccExecutor::QueccExecutor(std::unique_ptr<Storage>
storage)
sorted_transactions_.push_back(
std::vector<std::vector<KVOperation>>(thread_count_));
batch_ready_.push_back(false);
- multi_op_batches_.push_back(std::vector<KVRequest>());
- multi_op_number_batches_.push_back(std::vector<int>());
- wait_point_list_.push_back(std::vector<int>());
- multi_op_ready_.store(false);
std::thread planner(&QueccExecutor::PlannerThread, this, thread_number);
@@ -147,35 +143,6 @@ void QueccExecutor::PlannerThread(const int thread_number)
{
}
}
- if(multi_op_ready_.load()){
- //Process multi_op distributions for wait positions
- vector<KVRequest> multi_op_batch = multi_op_batches_[thread_number];
- vector<int> multi_op_id = multi_op_number_batches_[thread_number];
-
- //Look through each KV_Request, if a KV_Request is ever across 2 ranges
- //it could cause a cascading abort and is therefore recorded as a wait
point
- int txn_id=0;
- set<int> range_used_list = set<int>();
- for(size_t i=0; i<multi_op_batch.size(); i++){
- txn_id=multi_op_id[i];
- range_used_list.clear();
- for(const auto& request_operation: multi_op_batch[i].ops()){
- range_used_list.insert(rid_to_range_.at(request_operation.key()));
- if(range_used_list.size()>1){
- wait_point_list_[thread_number].push_back(txn_id);
- break;
- }
- }
- }
-
- ready_planner_count_.fetch_sub(1);
- if (ready_planner_count_.load() == 0) {
- cv2_.notify_all();
- }
- batch_ready_[thread_number] = false;
- continue;
- }
-
// Planner
std::vector<KVOperation> batch =
std::move(batch_array_[thread_number]);
@@ -188,68 +155,13 @@ void QueccExecutor::PlannerThread(const int
thread_number) {
batch_array_[thread_number].clear();
const int& range_being_executed = thread_number;
- int wait_point_position=0;
- int wait_point_vector=0;
- vector<vector<int>> local_wait_points;
- for(const auto& waitPointVector : wait_point_list_){
- local_wait_points.push_back(waitPointVector);
- }
-
execute_barrier.arrive_and_wait();
// Executor
for (int priority = 0; priority < thread_count_; priority++) {
std::vector<KVOperation>& range_ops =
this->sorted_transactions_[priority][range_being_executed];
- for (size_t op_count=0; op_count<range_ops.size(); op_count++) {
- KVOperation op = range_ops[op_count];
- //If past wait point, check for next wait point
-
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
- wait_point_position++;
-
if(wait_point_position==(int)wait_point_list_[wait_point_vector].size()){
- wait_point_position=0;
- wait_point_vector++;
- }
- }
- //If at wait point, first time through, check operations to know if
transaction is safe
- //If it succeeds, all subsequent operations in txn can skip this step
- //If it fails, then it goes into if, sses operations_checked is
negative, and skips the operation, as txn was aborted
-
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
&&
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
-
if(operations_checked_.find(op.transaction_number)==operations_checked_.end()){
- }
- if(operations_checked_[op.transaction_number].load()<0){
- continue;
- }
- KVOperation current_op=op;
- int current_txn_id=op.transaction_number;
- int new_op_count=op_count;
- bool same_txn=true;
- //Verify all operations in txn
- while(same_txn){
- if(operations_checked_[current_txn_id].load()>=0 &&
current_op.op.cmd() == Operation::SET){
- if(!VerifyRequest(current_op.op.key(), current_op.op.value())){
- //If verify fails, set to -2*op_count so it never becomes
positive
-
operations_checked_[current_txn_id].store(-2*transaction_tracker_[current_txn_id]);
- }
- }
- ++operations_checked_[current_txn_id];
- new_op_count++;
- current_op=range_ops[new_op_count];
- //Continues looping through operations until next operation has a
different txn id
- if(current_op.transaction_number!=current_txn_id){
- same_txn=false;
- new_op_count--;
- }
- }
- //Wait for all threads to finish checking ops from this txn
-
while(operations_checked_[current_txn_id].load()!=transaction_tracker_[current_txn_id]
&& operations_checked_[current_txn_id].load()>0){}
- //Check again in case another thread had an abort, if so, skip to
first operation of next txn id
- if(operations_checked_[current_txn_id].load()<0){
- op_count=new_op_count;
- continue;
- }
- }
- //After all checks are done, can execute operations
+ for (KVOperation& op : range_ops) {
std::unique_ptr<std::string> response = ExecuteData(op);
if (response == nullptr) {
response = std::make_unique<std::string>();
@@ -280,10 +192,6 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
transaction_tracker_.clear();
operation_list_.clear();
key_weight_.clear();
- multi_op_transactions_.clear();
- wait_point_list_.clear();
- operations_checked_.clear();
- multi_op_transactions_numbers_.clear();
total_weight_=0;
for (int i = 0; i < (int)batch_array_.size(); i++) {
@@ -302,10 +210,6 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
// printf("txn id: %lu\n", kv_request.txn_id());
// printf("kv_request size: %d\n", kv_request.ops_size());
if (kv_request.ops_size()) {
- if(kv_request.ops_size()>1){
- multi_op_transactions_.push_back(kv_request);
- multi_op_transactions_numbers_.push_back(txn_id);
- }
transaction_tracker_[txn_id] = kv_request.ops_size();
for(const auto& op : kv_request.ops()){
KVOperation newOp;
@@ -343,7 +247,6 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
transaction_tracker_[txn_id]=1;
}
- operations_checked_[txn_id].store(0);
txn_id++;
}
@@ -356,42 +259,17 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
batch_number++;
}
}
- CreateRanges();
-
- int multi_op_split_size =
multi_op_transactions_numbers_.size()/thread_count_+1;
- int count_per_split=0;
- int op_batch_number=0;
- //Send multi operation transactions to thread to gauge if they require waits
- for(size_t i=0; i<multi_op_transactions_numbers_.size(); i++){
- multi_op_ready_.store(true);
- multi_op_batches_[op_batch_number].push_back(multi_op_transactions_[i]);
-
multi_op_number_batches_[op_batch_number].push_back(multi_op_transactions_numbers_[i]);
- count_per_split++;
- if(count_per_split>multi_op_split_size){
- count_per_split=0;
- op_batch_number++;
- }
- }
-
- if(multi_op_ready_.load()){
- ready_planner_count_.fetch_add(thread_count_);
- // Allows planner threads to start consuming
- for (int i = 0; i < thread_count_; i++) {
- batch_ready_[i] = true;
- }
- cv_.notify_all();
-
- // Wait for threads to finish to get batch response
- while (ready_planner_count_.load() != 0) {
- std::unique_lock<std::mutex> lk2(mutex2_);
- if (cv2_.wait_for(lk2, std::chrono::microseconds(100),
- [this] { return is_stop_; })) {
- return std::move(this->batch_response_);
- }
- }
- multi_op_ready_.store(false);
+/*
+ // RIDs in hash map are now equal to which range they go into
+ int range_count = 0;
+ int range_size = ((rid_to_range_.size() - 1) / thread_count_) + 1;
+ for (const auto& key : rid_to_range_) {
+ rid_to_range_[key.first] = range_count / range_size;
+ range_count++;
}
-
+*/
+ CreateRanges();
+
ready_planner_count_.fetch_add(thread_count_);
// Allows planner threads to start consuming
for (int i = 0; i < thread_count_; i++) {
@@ -488,4 +366,4 @@ std::string QueccExecutor::GetRange(const std::string&
min_key,
return storage_->GetRange(min_key, max_key);
}
-} // namespace resdb
+} // namespace resdb
\ No newline at end of file
diff --git a/executor/kv/quecc_executor.h b/executor/kv/quecc_executor.h
index 1ea2bbab..8f18e9b5 100644
--- a/executor/kv/quecc_executor.h
+++ b/executor/kv/quecc_executor.h
@@ -97,13 +97,6 @@ class QueccExecutor : public TransactionManager {
atomic<int> ready_planner_count_;
std::unique_ptr<BatchUserResponse> batch_response_;
std::unique_ptr<Storage> storage_;
- vector<KVRequest> multi_op_transactions_;
- vector<int> multi_op_transactions_numbers_;
- vector<vector<KVRequest>> multi_op_batches_;
- vector<vector<int>> multi_op_number_batches_;
- atomic<bool> multi_op_ready_;
- vector<vector<int>> wait_point_list_;
- unordered_map<int, atomic<int>> operations_checked_;
};
-} // namespace resdb
+} // namespace resdb
\ No newline at end of file
diff --git a/executor/kv/quecc_executor.cpp
b/executor/kv/quecc_executor_wait.cpp
similarity index 94%
copy from executor/kv/quecc_executor.cpp
copy to executor/kv/quecc_executor_wait.cpp
index 83560fa8..9cae2bc9 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor_wait.cpp
@@ -151,7 +151,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
//Process multi_op distributions for wait positions
vector<KVRequest> multi_op_batch = multi_op_batches_[thread_number];
vector<int> multi_op_id = multi_op_number_batches_[thread_number];
-
+ std::cout<<"Hello 2.5 "<< multi_op_batch.size() << std::endl;
//Look through each KV_Request, if a KV_Request is ever across 2 ranges
//it could cause a cascading abort and is therefore recorded as a wait
point
int txn_id=0;
@@ -162,6 +162,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
for(const auto& request_operation: multi_op_batch[i].ops()){
range_used_list.insert(rid_to_range_.at(request_operation.key()));
if(range_used_list.size()>1){
+ std::cout<<"Hello 5"<<std::endl;
wait_point_list_[thread_number].push_back(txn_id);
break;
}
@@ -194,7 +195,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
for(const auto& waitPointVector : wait_point_list_){
local_wait_points.push_back(waitPointVector);
}
-
+ std::cout<<"Hello 6"<<std::endl;
execute_barrier.arrive_and_wait();
// Executor
@@ -203,18 +204,19 @@ void QueccExecutor::PlannerThread(const int
thread_number) {
this->sorted_transactions_[priority][range_being_executed];
for (size_t op_count=0; op_count<range_ops.size(); op_count++) {
KVOperation op = range_ops[op_count];
- //If past wait point, check for next wait point
-
while(op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
+ //If past wait point, check for next wait point
+ while(wait_point_vector<wait_point_list_.size() &&
op.transaction_number>wait_point_list_[wait_point_vector][wait_point_position]){
wait_point_position++;
if(wait_point_position==(int)wait_point_list_[wait_point_vector].size()){
wait_point_position=0;
wait_point_vector++;
}
}
+ //std::cout<<"Hello 7"<<std::endl;
//If at wait point, first time through, check operations to know if
transaction is safe
//If it succeeds, all subsequent operations in txn can skip this step
//If it fails, then it goes into if, sses operations_checked is
negative, and skips the operation, as txn was aborted
-
if(op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
&&
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
+ if(wait_point_vector<wait_point_list_.size() &&
op.transaction_number==wait_point_list_[wait_point_vector][wait_point_position]
&&
operations_checked_[op.transaction_number].load()!=transaction_tracker_[op.transaction_number]){
if(operations_checked_.find(op.transaction_number)==operations_checked_.end()){
}
if(operations_checked_[op.transaction_number].load()<0){
@@ -275,6 +277,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
const BatchUserRequest& request) {
+ std::cout<<"Hello"<<std::endl;
this->batch_response_ = std::make_unique<BatchUserResponse>();
rid_to_range_.clear();
transaction_tracker_.clear();
@@ -300,7 +303,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
// printf("txn id: %lu\n", kv_request.txn_id());
- // printf("kv_request size: %d\n", kv_request.ops_size());
+ //printf("kv_request size: %d\n", kv_request.ops_size());
if (kv_request.ops_size()) {
if(kv_request.ops_size()>1){
multi_op_transactions_.push_back(kv_request);
@@ -346,7 +349,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
operations_checked_[txn_id].store(0);
txn_id++;
}
-
+ std::cout<<"Hello 2 "<<operation_list_.size() << " "<<
multi_op_transactions_.size()<<std::endl;
int planner_vector_size =
(operation_list_.size() + thread_count_ - 1) / thread_count_;
for (const auto& op : operation_list_) {
@@ -372,7 +375,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
op_batch_number++;
}
}
-
+ std::cout<<"Hello 3 "<< multi_op_batches_[0].size()<<std::endl;
if(multi_op_ready_.load()){
ready_planner_count_.fetch_add(thread_count_);
// Allows planner threads to start consuming
@@ -391,7 +394,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
multi_op_ready_.store(false);
}
-
+ std::cout<<"Hello 4 "<<std::endl;
ready_planner_count_.fetch_add(thread_count_);
// Allows planner threads to start consuming
for (int i = 0; i < thread_count_; i++) {