This is an automated email from the ASF dual-hosted git repository.
saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/QueccBranch by this push:
new 27cf36d2 "Added LevelDB back to benchmark for more accurate comparing"
27cf36d2 is described below
commit 27cf36d21d37a352a78da2c46ce32a21f7f340f8
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Sat Apr 27 14:34:21 2024 -0700
"Added LevelDB back to benchmark for more accurate comparing"
---
benchmark/protocols/quecc/BUILD | 1 +
benchmark/protocols/quecc/quecc_benchmark.cpp | 11 +++-
executor/kv/kv_executor.cpp | 93 ++++++++++++++++++++-------
executor/kv/kv_executor.h | 2 +
executor/kv/quecc_executor.cpp | 31 +++++++--
5 files changed, 106 insertions(+), 32 deletions(-)
diff --git a/benchmark/protocols/quecc/BUILD b/benchmark/protocols/quecc/BUILD
index b268760e..65c627b1 100644
--- a/benchmark/protocols/quecc/BUILD
+++ b/benchmark/protocols/quecc/BUILD
@@ -12,5 +12,6 @@ cc_binary(
"//platform/config:resdb_config_utils",
"//proto/kv:kv_cc_proto",
"//chain/storage:memory_db",
+ "//chain/storage:leveldb"
],
)
diff --git a/benchmark/protocols/quecc/quecc_benchmark.cpp
b/benchmark/protocols/quecc/quecc_benchmark.cpp
index ebe7e696..5e9f4fe8 100644
--- a/benchmark/protocols/quecc/quecc_benchmark.cpp
+++ b/benchmark/protocols/quecc/quecc_benchmark.cpp
@@ -28,6 +28,7 @@
#include <chrono>
#include <cstdint>
#include <ctime>
+#include "chain/storage/leveldb.h"
#include "chain/storage/memory_db.h"
#include "executor/kv/kv_executor.h"
@@ -43,6 +44,7 @@ using resdb::KVExecutor;
using resdb::KVOperation;
using resdb::KVRequest;
using resdb::storage::MemoryDB;
+using resdb::storage::NewResLevelDB;
using resdb::QueccExecutor;
using resdb::ResConfigData;
using resdb::ResDBConfig;
@@ -128,6 +130,8 @@ BatchUserRequest RandomDistribution() {
}
int main(int argc, char** argv) {
+ //std::unique_ptr<Storage> storage = NewResLevelDB("/tmp/leveldb_test");
+
vector<BatchUserRequest> equal_split_array;
vector<BatchUserRequest> no_split_array;
vector<BatchUserRequest> random_split_array;
@@ -136,9 +140,10 @@ int main(int argc, char** argv) {
// no_split_array.push_back(NoDistribution());
// random_split_array.push_back(RandomDistribution());
}
- KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
-
- QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
+ //KVExecutor kv_executor=KVExecutor(std::make_unique<MemoryDB>());
+ KVExecutor kv_executor=KVExecutor(NewResLevelDB("/tmp/leveldb_test"));
+ //QueccExecutor quecc_executor=QueccExecutor(std::make_unique<MemoryDB>());
+ QueccExecutor
quecc_executor=QueccExecutor(NewResLevelDB("/tmp/leveldb_test"));
std::unique_ptr<BatchUserResponse> response;
// Equal Split Comparison
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index d63c6dfa..f89e0a2c 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -35,31 +35,61 @@ std::unique_ptr<std::string> KVExecutor::ExecuteData(
LOG(ERROR) << "parse data fail";
return nullptr;
}
-
- if (kv_request.cmd() == Operation::SET) {
- Set(kv_request.key(), kv_request.value());
- } else if (kv_request.cmd() == Operation::GET) {
- kv_response.set_value(Get(kv_request.key()));
- } else if (kv_request.cmd() == Operation::GETALLVALUES) {
- kv_response.set_value(GetAllValues());
- } else if (kv_request.cmd() == Operation::GETRANGE) {
- kv_response.set_value(GetRange(kv_request.key(), kv_request.value()));
- } else if (kv_request.cmd() == Operation::SET_WITH_VERSION) {
- SetWithVersion(kv_request.key(), kv_request.value(), kv_request.version());
- } else if (kv_request.cmd() == Operation::GET_WITH_VERSION) {
- GetWithVersion(kv_request.key(), kv_request.version(),
- kv_response.mutable_value_info());
- } else if (kv_request.cmd() == Operation::GET_ALL_ITEMS) {
- GetAllItems(kv_response.mutable_items());
- } else if (kv_request.cmd() == Operation::GET_KEY_RANGE) {
- GetKeyRange(kv_request.min_key(), kv_request.max_key(),
- kv_response.mutable_items());
- } else if (kv_request.cmd() == Operation::GET_HISTORY) {
- GetHistory(kv_request.key(), kv_request.min_version(),
- kv_request.max_version(), kv_response.mutable_items());
- } else if (kv_request.cmd() == Operation::GET_TOP) {
- GetTopHistory(kv_request.key(), kv_request.top_number(),
+ if (kv_request.ops_size()) {
+ for(const auto& op : kv_request.ops()){
+ if (op.cmd() == Operation::SET) {
+ Set(op.key(), op.value());
+ } else if (op.cmd() == Operation::GET) {
+ kv_response.set_value(Get(op.key()));
+ } else if (op.cmd() == Operation::GETALLVALUES) {
+ kv_response.set_value(GetAllValues());
+ } else if (op.cmd() == Operation::GETRANGE) {
+ kv_response.set_value(GetRange(op.key(), op.value()));
+ } else if (op.cmd() == Operation::SET_WITH_VERSION) {
+ SetWithVersion(op.key(), op.value(), kv_request.version());
+ } else if (op.cmd() == Operation::GET_WITH_VERSION) {
+ GetWithVersion(op.key(), kv_request.version(),
+ kv_response.mutable_value_info());
+ } else if (op.cmd() == Operation::GET_ALL_ITEMS) {
+ GetAllItems(kv_response.mutable_items());
+ } else if (op.cmd() == Operation::GET_KEY_RANGE) {
+ GetKeyRange(kv_request.min_key(), kv_request.max_key(),
+ kv_response.mutable_items());
+ } else if (op.cmd() == Operation::GET_HISTORY) {
+ GetHistory(op.key(), kv_request.min_version(),
+ kv_request.max_version(), kv_response.mutable_items());
+ } else if (op.cmd() == Operation::GET_TOP) {
+ GetTopHistory(op.key(), kv_request.top_number(),
+ kv_response.mutable_items());
+ }
+ }
+ }
+ else{
+ if (kv_request.cmd() == Operation::SET) {
+ Set(kv_request.key(), kv_request.value());
+ } else if (kv_request.cmd() == Operation::GET) {
+ kv_response.set_value(Get(kv_request.key()));
+ } else if (kv_request.cmd() == Operation::GETALLVALUES) {
+ kv_response.set_value(GetAllValues());
+ } else if (kv_request.cmd() == Operation::GETRANGE) {
+ kv_response.set_value(GetRange(kv_request.key(), kv_request.value()));
+ } else if (kv_request.cmd() == Operation::SET_WITH_VERSION) {
+ SetWithVersion(kv_request.key(), kv_request.value(),
kv_request.version());
+ } else if (kv_request.cmd() == Operation::GET_WITH_VERSION) {
+ GetWithVersion(kv_request.key(), kv_request.version(),
+ kv_response.mutable_value_info());
+ } else if (kv_request.cmd() == Operation::GET_ALL_ITEMS) {
+ GetAllItems(kv_response.mutable_items());
+ } else if (kv_request.cmd() == Operation::GET_KEY_RANGE) {
+ GetKeyRange(kv_request.min_key(), kv_request.max_key(),
kv_response.mutable_items());
+ } else if (kv_request.cmd() == Operation::GET_HISTORY) {
+ GetHistory(kv_request.key(), kv_request.min_version(),
+ kv_request.max_version(), kv_response.mutable_items());
+ } else if (kv_request.cmd() == Operation::GET_TOP) {
+ GetTopHistory(kv_request.key(), kv_request.top_number(),
+ kv_response.mutable_items());
+ }
}
std::unique_ptr<std::string> resp_str = std::make_unique<std::string>();
@@ -143,5 +173,20 @@ void KVExecutor::GetTopHistory(const std::string& key, int
top_number,
item->mutable_value_info()->set_version(it.second);
}
}
+std::unique_ptr<BatchUserResponse> KVExecutor::ExecuteBatch(
+ const BatchUserRequest& request) {
+ std::unique_ptr<BatchUserResponse> batch_response =
+ std::make_unique<BatchUserResponse>();
+ for (auto& sub_request : request.user_requests()) {
+ std::unique_ptr<std::string> response =
+ ExecuteData(sub_request.request().data());
+ if (response == nullptr) {
+ response = std::make_unique<std::string>();
+ }
+ batch_response->add_response()->swap(*response);
+ }
+
+ return batch_response;
+}
} // namespace resdb
diff --git a/executor/kv/kv_executor.h b/executor/kv/kv_executor.h
index 0fda88ae..3a078995 100644
--- a/executor/kv/kv_executor.h
+++ b/executor/kv/kv_executor.h
@@ -35,6 +35,8 @@ class KVExecutor : public TransactionManager {
virtual ~KVExecutor() = default;
std::unique_ptr<std::string> ExecuteData(const std::string& request)
override;
+ std::unique_ptr<BatchUserResponse> ExecuteBatch(
+ const BatchUserRequest& request) override;
protected:
virtual void Set(const std::string& key, const std::string& value);
diff --git a/executor/kv/quecc_executor.cpp b/executor/kv/quecc_executor.cpp
index 727d895d..ce11ce7b 100644
--- a/executor/kv/quecc_executor.cpp
+++ b/executor/kv/quecc_executor.cpp
@@ -46,14 +46,14 @@
using namespace std;
using resdb::KVRequest;
// 4 is hardcoded for thread count for now, eventually will use config value
-barrier execute_barrier(5);
+barrier execute_barrier(4);
mutex results_mutex;
namespace resdb {
QueccExecutor::QueccExecutor(std::unique_ptr<Storage> storage)
: storage_(std::move(storage)) {
- thread_count_ = 5;
+ thread_count_ = 4;
atomic<int> ready_planner_count_(0);
std::unique_ptr<BatchUserResponse> batch_output =
std::make_unique<BatchUserResponse>();
@@ -156,6 +156,7 @@ void QueccExecutor::PlannerThread(const int thread_number) {
const int& range_being_executed = thread_number;
execute_barrier.arrive_and_wait();
+ auto start_time = std::chrono::high_resolution_clock::now();
// Executor
for (int priority = 0; priority < thread_count_; priority++) {
@@ -170,6 +171,10 @@ void QueccExecutor::PlannerThread(const int thread_number)
{
}
this->sorted_transactions_[priority][range_being_executed].clear();
}
+ auto end_time = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
+ //printf("Time Taken 5: %d\n", (int)duration.count());
// Lock used to minimize conflicts of adding to batchresponse
results_mutex.lock();
@@ -187,6 +192,8 @@ void QueccExecutor::PlannerThread(const int thread_number) {
std::unique_ptr<BatchUserResponse> QueccExecutor::ExecuteBatch(
const BatchUserRequest& request) {
+ auto start_time = std::chrono::high_resolution_clock::now();
+
this->batch_response_ = std::make_unique<BatchUserResponse>();
rid_to_range_.clear();
transaction_tracker_.clear();
@@ -206,7 +213,6 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
LOG(ERROR) << "parse data fail";
return std::move(this->batch_response_);
}
-
// printf("txn id: %lu\n", kv_request.txn_id());
// printf("kv_request size: %d\n", kv_request.ops_size());
if (kv_request.ops_size()) {
@@ -215,7 +221,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
KVOperation newOp;
newOp.transaction_number=txn_id;
newOp.op= op;
- operation_list_.push_back(newOp);
+ operation_list_.push_back(std::move(newOp));
if(!key_weight_.count(op.key())){
key_weight_[op.key()]=0;
}
@@ -235,7 +241,7 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
newOp.op.set_cmd(kv_request.cmd());
newOp.op.set_key(kv_request.key());
newOp.op.set_value(kv_request.value());
- operation_list_.push_back(newOp);
+ operation_list_.push_back(std::move(newOp));
if(kv_request.cmd() == Operation::SET){
key_weight_[kv_request.key()]=key_weight_[kv_request.key()]+5;
@@ -248,7 +254,15 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
transaction_tracker_[txn_id]=1;
}
txn_id++;
+ auto end_time = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
+ //printf("Time Taken 5: %d\n", (int)duration.count());
}
+ auto end_time = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
+ printf("Time Taken 1: %d\n", (int)duration.count());
int planner_vector_size =
(operation_list_.size() + thread_count_ - 1) / thread_count_;
@@ -269,6 +283,10 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
}
*/
CreateRanges();
+ end_time = std::chrono::high_resolution_clock::now();
+ duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
+ printf("Time Taken 2: %d\n", (int)duration.count());
ready_planner_count_.fetch_add(thread_count_);
// Allows planner threads to start consuming
@@ -276,6 +294,9 @@ std::unique_ptr<BatchUserResponse>
QueccExecutor::ExecuteBatch(
batch_ready_[i] = true;
}
cv_.notify_all();
+ end_time = std::chrono::high_resolution_clock::now();
+ duration = std::chrono::duration_cast<std::chrono::microseconds>(
+ end_time - start_time);
// Wait for threads to finish to get batch response
while (ready_planner_count_.load() != 0) {