This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/QueccBranch by this push:
new a7596f8a "Fixed benchmark issues, added new experiments"
a7596f8a is described below
commit a7596f8ae9a39baef0f11c1445b4e8f5adb4e705
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Wed May 22 17:19:36 2024 -0700
"Fixed benchmark issues, added new experiments"
---
benchmark/protocols/quecc/BUILD | 4 +-
benchmark/protocols/quecc/quecc_benchmark.cpp | 38 ++++++++++----
chain/storage/leveldb.cpp | 15 +++++-
executor/kv/BUILD | 13 +++++
executor/kv/quecc_executor.cpp | 27 ----------
executor/kv/strict_executor.cpp | 71 ++++++++++++++-------------
executor/kv/strict_executor.h | 19 +++++--
7 files changed, 111 insertions(+), 76 deletions(-)
diff --git a/benchmark/protocols/quecc/BUILD b/benchmark/protocols/quecc/BUILD
index 65c627b1..c0d20ecc 100644
--- a/benchmark/protocols/quecc/BUILD
+++ b/benchmark/protocols/quecc/BUILD
@@ -9,9 +9,11 @@ cc_binary(
"//chain/state:chain_state",
"//executor/kv:kv_executor",
"//executor/kv:quecc_executor",
+ "//executor/kv:strict_executor",
"//platform/config:resdb_config_utils",
"//proto/kv:kv_cc_proto",
"//chain/storage:memory_db",
- "//chain/storage:leveldb"
+ "//chain/storage:leveldb",
+ "//chain/storage:rocksdb"
],
)
diff --git a/benchmark/protocols/quecc/quecc_benchmark.cpp
b/benchmark/protocols/quecc/quecc_benchmark.cpp
index 5e9f4fe8..6a6d5209 100644
--- a/benchmark/protocols/quecc/quecc_benchmark.cpp
+++ b/benchmark/protocols/quecc/quecc_benchmark.cpp
@@ -29,10 +29,12 @@
#include <cstdint>
#include <ctime>
#include "chain/storage/leveldb.h"
+#include "chain/storage/rocksdb.h"
#include "chain/storage/memory_db.h"
#include "executor/kv/kv_executor.h"
#include "executor/kv/quecc_executor.h"
+#include "executor/kv/strict_executor.h"
#include "platform/config/resdb_config_utils.h"
#include "platform/proto/resdb.pb.h"
#include "proto/kv/kv.pb.h"
@@ -45,7 +47,9 @@ using resdb::KVOperation;
using resdb::KVRequest;
using resdb::storage::MemoryDB;
using resdb::storage::NewResLevelDB;
+using resdb::storage::NewResRocksDB;
using resdb::QueccExecutor;
+using resdb::StrictExecutor;
using resdb::ResConfigData;
using resdb::ResDBConfig;
using resdb::Storage;
@@ -63,7 +67,7 @@ BatchUserRequest EqualDistribution() {
// add transaction
KVRequest request;
- for (int j = 0; j < 100; j++) {
+ for (int j = 0; j < 1000; j++) {
// add operation
resdb::Operation* op = request.add_ops();
op->set_cmd(resdb::Operation::SET);
@@ -141,15 +145,20 @@ int main(int argc, char** argv) {
// random_split_array.push_back(RandomDistribution());
}
//KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
- KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test"));
+ //KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test1"));
+
//QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
- QueccExecutor
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test"));
+ //QueccExecutor
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test7", std::nullopt));
+
+ //StrictExecutor
strict_executor=StrictExecutor(std::make_unique<MemoryDB>());
+ StrictExecutor
strict_executor=StrictExecutor(NewResLevelDB("/tmp/leveldb_test8",
std::nullopt));
+ std::cout<<"Hello 2"<<std::endl;
std::unique_ptr<BatchUserResponse> response;
// Equal Split Comparison
printf("Equal Split Times\n");
- auto start_time = std::chrono::high_resolution_clock::now();
+ /*auto start_time = std::chrono::high_resolution_clock::now();
for (BatchUserRequest equal_split : equal_split_array) {
response = quecc_executor.ExecuteBatch(equal_split);
}
@@ -157,14 +166,25 @@ int main(int argc, char** argv) {
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
end_time - start_time);
- printf("Quecc Time Taken: %d\n", (int)duration.count());
- start_time = std::chrono::high_resolution_clock::now();
+ printf("Quecc Time Taken: %d\n", (int)duration.count());*/
+
+ /*auto start_time = std::chrono::high_resolution_clock::now();
for (BatchUserRequest equal_split : equal_split_array) {
response = kv_executor.ExecuteBatch(equal_split);
}
- end_time = std::chrono::high_resolution_clock::now();
- duration = std::chrono::duration_cast<std::chrono::microseconds>(end_time -
+ auto end_time = std::chrono::high_resolution_clock::now();
+ auto duration =
std::chrono::duration_cast<std::chrono::microseconds>(end_time -
start_time);
- printf("KV Time Taken: %d\n", (int)duration.count());
+ printf("KV Time Taken: %d\n", (int)duration.count());*/
+
+ auto start_time = std::chrono::high_resolution_clock::now();
+ for (BatchUserRequest equal_split : equal_split_array) {
+ response = strict_executor.ExecuteBatch(equal_split);
+ }
+
+ auto end_time = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
+ printf("2PL Time Taken: %d\n", (int)duration.count());
}
\ No newline at end of file
diff --git a/chain/storage/leveldb.cpp b/chain/storage/leveldb.cpp
index 2cc96fb5..6fabbea7 100644
--- a/chain/storage/leveldb.cpp
+++ b/chain/storage/leveldb.cpp
@@ -23,6 +23,8 @@
#include "chain/storage/proto/kv.pb.h"
+std::mutex batch_mutex;
+
namespace resdb {
namespace storage {
@@ -77,18 +79,29 @@ ResLevelDB::~ResLevelDB() {
}
int ResLevelDB::SetValue(const std::string& key, const std::string& value) {
+ leveldb::WriteBatch localBatch;
+ batch_mutex.lock();
batch_.Put(key, value);
if (batch_.ApproximateSize() >= write_batch_size_) {
- leveldb::Status status = db_->Write(leveldb::WriteOptions(), &batch_);
+ localBatch.Append(batch_);
+ //batch_.Clear();
+ //batch_mutex.unlock();
+ leveldb::Status status = db_->Write(leveldb::WriteOptions(), &localBatch);
if (status.ok()) {
+ //batch_mutex.lock();
batch_.Clear();
+ batch_mutex.unlock();
return 0;
} else {
LOG(ERROR) << "flush buffer fail:" << status.ToString();
+ //batch_mutex.lock();
+ batch_.Append(localBatch);
+ batch_mutex.unlock();
return -1;
}
}
+ batch_mutex.unlock();
return 0;
}
diff --git a/executor/kv/BUILD b/executor/kv/BUILD
index e05aad22..3005b3e9 100644
--- a/executor/kv/BUILD
+++ b/executor/kv/BUILD
@@ -46,6 +46,19 @@ cc_library(
],
)
+cc_library(
+ name = "strict_executor",
+ srcs = ["strict_executor.cpp"],
+ hdrs = ["strict_executor.h"],
+ deps = [
+ "//chain/storage",
+ "//common:comm",
+ "//executor/common:transaction_manager",
+ "//platform/config:resdb_config_utils",
+ "//proto/kv:kv_cc_proto",
+ ],
+)
+
cc_test(
name = "kv_executor_test",
srcs = ["kv_executor_test.cpp"],
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 13c2d23a..a1959a57 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -156,7 +156,6 @@ void QueccExecutor::PlannerThread(const int thread_number) {
const int& range_being_executed = thread_number;
execute_barrier.arrive_and_wait();
- auto start_time = std::chrono::high_resolution_clock::now();
// Executor
for (int priority = 0; priority < thread_count_; priority++) {
@@ -171,10 +170,6 @@ void QueccExecutor::PlannerThread(const int thread_number)
{
}
this->sorted_transactions_[priority][range_being_executed].clear();
}
- auto end_time = std::chrono::high_resolution_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
- end_time - start_time);
- //printf("Time Taken 5: %d\n", (int)duration.count());
// Lock used to minimize conflicts of adding to batchresponse
results_mutex.lock();
@@ -192,7 +187,6 @@ void QueccExecutor::PlannerThread(const int thread_number) {
std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
const BatchUserRequest& request) {
- auto start_time = std::chrono::high_resolution_clock::now();
this->batch_response_ = std::make_unique<BatchUserResponse>();
rid_to_range_.clear();
@@ -254,15 +248,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
transaction_tracker_[txn_id]=1;
}
txn_id++;
- auto end_time = std::chrono::high_resolution_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
- end_time - start_time);
- //printf("Time Taken 5: %d\n", (int)duration.count());
}
- auto end_time = std::chrono::high_resolution_clock::now();
- auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
- end_time - start_time);
- printf("Time Taken 1: %d\n", (int)duration.count());
int planner_vector_size =
(operation_list_.size() + thread_count_ - 1) / thread_count_;
@@ -283,20 +269,12 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
*/
CreateRanges();
- end_time = std::chrono::high_resolution_clock::now();
- duration = std::chrono::duration_cast<std::chrono::microseconds>(
- end_time - start_time);
- printf("Time Taken 2: %d\n", (int)duration.count());
-
ready_planner_count_.fetch_add(thread_count_);
// Allows planner threads to start consuming
for (int i = 0; i < thread_count_; i++) {
batch_ready_[i] = true;
}
cv_.notify_all();
- end_time = std::chrono::high_resolution_clock::now();
- duration = std::chrono::duration_cast<std::chrono::microseconds>(
- end_time - start_time);
// Wait for threads to finish to get batch response
while (ready_planner_count_.load() != 0) {
@@ -325,11 +303,6 @@ std::unique_ptr<std::string>
QueccExecutor::ExecuteData(const KVOperation& oper)
if (!kv_response.SerializeToString(resp_str.get())) {
return nullptr;
}
- // Add some way to decrement number of KVOperations for the txn in
- // transaction_tracker here, could have map of int->atomic_int
- // If a txn has 0 KVOperations remaining, remove it
- // Any txns left in map by end had some issue/aborted and we know which
- // txns must be undone
return resp_str;
}
diff --git a/executor/kv/strict_executor.cpp b/executor/kv/strict_executor.cpp
index b4d73ab2..1d4fac0d 100644
--- a/executor/kv/strict_executor.cpp
+++ b/executor/kv/strict_executor.cpp
@@ -17,25 +17,25 @@
* under the License.
*/
-#include "executor/kv/kv_executor.h"
+#include "executor/kv/strict_executor.h"
#include <glog/logging.h>
-
+using namespace std;
+using resdb::KVRequest;
namespace resdb {
StrictExecutor::StrictExecutor(std::unique_ptr<Storage> storage)
: storage_(std::move(storage)) {
int thread_count_=4;
for(int i=0; i<thread_count_;i++){
- std::thread planner(&StrictExecutor::WorkerThread, this, NULL);
+ std::thread worker(&StrictExecutor::WorkerThread, this);
// thread pinning
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
- CPU_SET(thread_number, &cpuset);
- int status = pthread_setaffinity_np(planner.native_handle(),
- sizeof(cpu_set_t), &cpuset);
- thread_list_.push_back(move(planner));
+ CPU_SET(i, &cpuset);
+ //int status =
pthread_setaffinity_np(planner.native_handle(),sizeof(cpu_set_t), &cpuset);
+ thread_list_.push_back(move(worker));
}
}
@@ -56,22 +56,25 @@ void StrictExecutor::WorkerThread(){
std::set<std::string> held_locks;
bool lock_conflict;
std::string key;
+ std::unique_lock<std::mutex> queue_lock(queue_lock_);
+ std::unique_lock<std::mutex> map_lock(map_lock_);
+ std::unique_lock<std::mutex> response_lock(response_lock_);
while(true){
//Wait until request for thread to process is added and grab it
- queue_lock_.lock();
- while(request_queue.size()==0){
- if (not_empty_queue_.wait_for(queue_lock_,
std::chrono::microseconds(100),
- [this] { return is_stop_; })) {
+ queue_lock.lock();
+ while(request_queue_.size()==0){
+ if (not_empty_queue_.wait_for(queue_lock, std::chrono::microseconds(100),
+ [&] { return is_stop_; })) {
queue_lock_.unlock();
return;
}
}
kv_request=request_queue_.front();
- request_queue.pop();
- queue_lock_.unlock();
+ request_queue_.pop();
+ queue_lock.unlock();
//Grab locks needed for request
- map_lock_.lock();
+ map_lock.lock();
//If multiple operations in kv_request
if(kv_request.ops_size()){
lock_conflict=true;
@@ -91,13 +94,13 @@ void StrictExecutor::WorkerThread(){
}
}
if(lock_conflict){
- for(auto& key : held_locks_){
+ for(auto& key : held_locks){
lock_map_[key]=0;
}
lock_taken_.notify_all();
//Put thread to sleep until another thread releases locks or time
expires
- if (lock_taken_.wait_for(map_lock_,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
- map_lock_.unlock();
+ if (lock_taken_.wait_for(map_lock,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+ map_lock.unlock();
return;
}
}
@@ -110,34 +113,34 @@ void StrictExecutor::WorkerThread(){
held_locks.insert(kv_request.key());
}
else{
- while(lock_map[kv_request.key()]==1){
- if (lock_taken_.wait_for(map_lock_,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
- map_lock_.unlock();
+ while(lock_map_[kv_request.key()]==1){
+ if (lock_taken_.wait_for(map_lock,
std::chrono::microseconds(100),[this] { return is_stop_; })) {
+ map_lock.unlock();
return;
}
}
}
}
- map_lock_.unlock();
+ map_lock.unlock();
//Once all locks are grabbed, execute transaction
std::unique_ptr<std::string> response =
- ExecuteData(kv_request);
+ ExecuteData(kv_request);
if (response == nullptr) {
response = std::make_unique<std::string>();
}
//Release locks that were grabbed
- map_lock_.lock();
- for(auto& key : held_locks_){
+ map_lock.lock();
+ for(auto& key : held_locks){
lock_map_[key]=0;
}
- map_lock_.unlock();
+ map_lock.unlock();
lock_taken_.notify_all();
//Add response
response_lock_.lock();
- this->batch_response->add_response()->swap(*response);
+ this->batch_response_->add_response()->swap(*response);
//Need to check all threads are done processing, and if all threads are
done + queue is empty, wake main thread
//Maybe check number of responses in batch response, if equals number of
requests, all requests are processed
if(this->batch_response_.get()->response_size()==request_count_){
@@ -151,28 +154,30 @@ std::unique_ptr<BatchUserResponse>
StrictExecutor::ExecuteBatch(
const BatchUserRequest& request) {
this->batch_response_ = std::make_unique<BatchUserResponse>();
request_count_=0;
+ std::unique_lock<std::mutex> queue_lock(queue_lock_);
for (const auto& sub_request : request.user_requests()) {
KVRequest kv_request;
if (!kv_request.ParseFromString(sub_request.request().data())) {
LOG(ERROR) << "parse data fail";
return std::move(this->batch_response_);
}
- queue_lock_.lock();
+ queue_lock.lock();
request_queue_.push(std::move(kv_request));
request_count_++;
- queue_lock_.unlock();
+ queue_lock.unlock();
//Might want to move this/add new condition to avoid unecessary broadcasts
- not_empty_queue_.notify_all():
+ not_empty_queue_.notify_all();
}
- response_lock_.lock();
+ std::unique_lock<std::mutex> response_lock(response_lock_);
+ response_lock.lock();
while(this->batch_response_.get()->response_size()!=request_count_){
- if (empty_queue_.wait_for(queue_lock_, std::chrono::microseconds(100),
+ if (empty_queue_.wait_for(queue_lock, std::chrono::microseconds(100),
[this] { return is_stop_; })) {
- response_lock_.unlock();
+ response_lock.unlock();
return std::move(this->batch_response_);
}
}
- response_lock_.unlock();
+ response_lock.unlock();
return std::move(this->batch_response_);
}
diff --git a/executor/kv/strict_executor.h b/executor/kv/strict_executor.h
index 1cefe09b..c973235f 100644
--- a/executor/kv/strict_executor.h
+++ b/executor/kv/strict_executor.h
@@ -21,20 +21,29 @@
#include <map>
#include <optional>
+#include <atomic>
+#include <barrier>
+#include <list>
+#include <map>
+#include <memory>
+#include <optional>
+#include <thread>
#include <unordered_map>
-
+#include <vector>
+#include <queue>
+#include <condition_variable>
#include "chain/storage/storage.h"
#include "executor/common/transaction_manager.h"
#include "proto/kv/kv.pb.h"
-
+using namespace std;
namespace resdb {
class StrictExecutor : public TransactionManager {
public:
StrictExecutor(std::unique_ptr<Storage> storage);
- virtual ~StrictExecutor() = default;
+ virtual ~StrictExecutor();
- std::unique_ptr<std::string> ExecuteData(const std::string& request)
override;
+ std::unique_ptr<std::string> ExecuteData(const KVRequest& kv_request);
std::unique_ptr<BatchUserResponse> ExecuteBatch(
const BatchUserRequest& request) override;
@@ -61,7 +70,7 @@ class StrictExecutor : public TransactionManager {
std::mutex queue_lock_;
std::mutex map_lock_;
std::mutex response_lock_;
- std::queue request_queue_;
+ std::queue<KVRequest> request_queue_;
std::condition_variable empty_queue_;
std::condition_variable not_empty_queue_;
std::condition_variable lock_taken_;