This is an automated email from the ASF dual-hosted git repository.

saipranav pushed a commit to branch QueccBranch
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git


The following commit(s) were added to refs/heads/QueccBranch by this push:
     new 7392befa "Added some framework for 2PL testing"
7392befa is described below

commit 7392befa373e8c377ca27709d7032a66cf7e1c86
Author: Saipranav Kotamreddy <[email protected]>
AuthorDate: Thu May 2 17:32:53 2024 -0700

    "Added some framework for 2PL testing"
---
 executor/kv/strict_executor.cpp | 206 ++++++++++++++++++++++++++++++++++++++++
 executor/kv/strict_executor.h   |  68 +++++++++++++
 2 files changed, 274 insertions(+)

diff --git a/executor/kv/strict_executor.cpp b/executor/kv/strict_executor.cpp
new file mode 100644
index 00000000..b5c2e025
--- /dev/null
+++ b/executor/kv/strict_executor.cpp
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "executor/kv/kv_executor.h"
+
+#include <glog/logging.h>
+
+namespace resdb {
+
+StrictExecutor::StrictExecutor(std::unique_ptr<Storage> storage)
+    : storage_(std::move(storage)) {
+      int thread_count_=4;
+      for(int i=0; i<thread_count_;i++){
+        std::thread planner(&StrictExecutor::WorkerThread, this, NULL);
+
+        // thread pinning
+        cpu_set_t cpuset;
+        CPU_ZERO(&cpuset);
+        CPU_SET(thread_number, &cpuset);
+        int status = pthread_setaffinity_np(planner.native_handle(),
+                                            sizeof(cpu_set_t), &cpuset);
+        thread_list_.push_back(move(planner));
+      }
+
+    }
+
+void StrictExecutor::WorkerThread(){
+  
+}
+
+std::unique_ptr<BatchUserResponse> StrictExecutor::ExecuteBatch(
+    const BatchUserRequest& request) {
+  std::unique_ptr<BatchUserResponse> batch_response =
+      std::make_unique<BatchUserResponse>();
+  for (auto& sub_request : request.user_requests()) {
+    std::unique_ptr<std::string> response =
+        ExecuteData(sub_request.request().data());
+    if (response == nullptr) {
+      response = std::make_unique<std::string>();
+    }
+    batch_response->add_response()->swap(*response);
+  }
+
+  return batch_response;
+}
+
+std::unique_ptr<std::string> StrictExecutor::ExecuteData(
+    const KVRequest& kv_request) {
+  KVResponse kv_response;
+
+  if (kv_request.ops_size()) {
+    for(const auto& op : kv_request.ops()){
+      if (op.cmd() == Operation::SET) {
+        Set(op.key(), op.value());
+      } else if (op.cmd() == Operation::GET) {
+        kv_response.set_value(Get(op.key()));
+      } else if (op.cmd() == Operation::GETALLVALUES) {
+        kv_response.set_value(GetAllValues());
+      } else if (op.cmd() == Operation::GETRANGE) {
+        kv_response.set_value(GetRange(op.key(), op.value()));
+      } else if (op.cmd() == Operation::SET_WITH_VERSION) {
+        SetWithVersion(op.key(), op.value(), kv_request.version());
+      } else if (op.cmd() == Operation::GET_WITH_VERSION) {
+        GetWithVersion(op.key(), kv_request.version(),
+                      kv_response.mutable_value_info());
+      } else if (op.cmd() == Operation::GET_ALL_ITEMS) {
+        GetAllItems(kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_KEY_RANGE) {
+        GetKeyRange(kv_request.min_key(), kv_request.max_key(),
+                    kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_HISTORY) {
+        GetHistory(op.key(), kv_request.min_version(),
+                  kv_request.max_version(), kv_response.mutable_items());
+      } else if (op.cmd() == Operation::GET_TOP) {
+        GetTopHistory(op.key(), kv_request.top_number(),
+                      kv_response.mutable_items());
+      }
+    }
+  }
+  else{
+    if (kv_request.cmd() == Operation::SET) {
+      Set(kv_request.key(), kv_request.value());
+    } else if (kv_request.cmd() == Operation::GET) {
+      kv_response.set_value(Get(kv_request.key()));
+    } else if (kv_request.cmd() == Operation::GETALLVALUES) {
+      kv_response.set_value(GetAllValues());
+    } else if (kv_request.cmd() == Operation::GETRANGE) {
+      kv_response.set_value(GetRange(kv_request.key(), kv_request.value()));
+    } else if (kv_request.cmd() == Operation::SET_WITH_VERSION) {
+      SetWithVersion(kv_request.key(), kv_request.value(), 
kv_request.version());
+    } else if (kv_request.cmd() == Operation::GET_WITH_VERSION) {
+      GetWithVersion(kv_request.key(), kv_request.version(),
+                    kv_response.mutable_value_info());
+    } else if (kv_request.cmd() == Operation::GET_ALL_ITEMS) {
+      GetAllItems(kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_KEY_RANGE) {
+      GetKeyRange(kv_request.min_key(), kv_request.max_key(),
+                  kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_HISTORY) {
+      GetHistory(kv_request.key(), kv_request.min_version(),
+                kv_request.max_version(), kv_response.mutable_items());
+    } else if (kv_request.cmd() == Operation::GET_TOP) {
+      GetTopHistory(kv_request.key(), kv_request.top_number(),
+                    kv_response.mutable_items());
+    }
+  }
+
+  std::unique_ptr<std::string> resp_str = std::make_unique<std::string>();
+  if (!kv_response.SerializeToString(resp_str.get())) {
+    return nullptr;
+  }
+  return resp_str;
+}
+
+void StrictExecutor::Set(const std::string& key, const std::string& value) {
+  storage_->SetValue(key, value);
+}
+
+std::string StrictExecutor::Get(const std::string& key) {
+  return storage_->GetValue(key);
+}
+
+std::string StrictExecutor::GetAllValues() { return storage_->GetAllValues(); }
+
+// Get values on a range of keys
+std::string StrictExecutor::GetRange(const std::string& min_key,
+                                 const std::string& max_key) {
+  return storage_->GetRange(min_key, max_key);
+}
+
+void StrictExecutor::SetWithVersion(const std::string& key,
+                                const std::string& value, int version) {
+  storage_->SetValueWithVersion(key, value, version);
+}
+
+void StrictExecutor::GetWithVersion(const std::string& key, int version,
+                                ValueInfo* info) {
+  std::pair<std::string, int> ret = storage_->GetValueWithVersion(key, 
version);
+  info->set_value(ret.first);
+  info->set_version(ret.second);
+}
+
+void StrictExecutor::GetAllItems(Items* items) {
+  const std::map<std::string, std::pair<std::string, int>>& ret =
+      storage_->GetAllItems();
+  for (auto it : ret) {
+    Item* item = items->add_item();
+    item->set_key(it.first);
+    item->mutable_value_info()->set_value(it.second.first);
+    item->mutable_value_info()->set_version(it.second.second);
+  }
+}
+
+void StrictExecutor::GetKeyRange(const std::string& min_key,
+                             const std::string& max_key, Items* items) {
+  const std::map<std::string, std::pair<std::string, int>>& ret =
+      storage_->GetKeyRange(min_key, max_key);
+  for (auto it : ret) {
+    Item* item = items->add_item();
+    item->set_key(it.first);
+    item->mutable_value_info()->set_value(it.second.first);
+    item->mutable_value_info()->set_version(it.second.second);
+  }
+}
+
+void StrictExecutor::GetHistory(const std::string& key, int min_version,
+                            int max_version, Items* items) {
+  const std::vector<std::pair<std::string, int>>& ret =
+      storage_->GetHistory(key, min_version, max_version);
+  for (auto it : ret) {
+    Item* item = items->add_item();
+    item->set_key(key);
+    item->mutable_value_info()->set_value(it.first);
+    item->mutable_value_info()->set_version(it.second);
+  }
+}
+
+void StrictExecutor::GetTopHistory(const std::string& key, int top_number,
+                               Items* items) {
+  const std::vector<std::pair<std::string, int>>& ret =
+      storage_->GetTopHistory(key, top_number);
+  for (auto it : ret) {
+    Item* item = items->add_item();
+    item->set_key(key);
+    item->mutable_value_info()->set_value(it.first);
+    item->mutable_value_info()->set_version(it.second);
+  }
+}
+
+}  // namespace resdb
diff --git a/executor/kv/strict_executor.h b/executor/kv/strict_executor.h
new file mode 100644
index 00000000..5722a1c5
--- /dev/null
+++ b/executor/kv/strict_executor.h
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <optional>
+#include <unordered_map>
+
+#include "chain/storage/storage.h"
+#include "executor/common/transaction_manager.h"
+#include "proto/kv/kv.pb.h"
+
+namespace resdb {
+
+class StrictExecutor : public TransactionManager {
+ public:
+  StrictExecutor(std::unique_ptr<Storage> storage);
+  virtual ~StrictExecutor() = default;
+
+  std::unique_ptr<std::string> ExecuteData(const std::string& request) 
override;
+  std::unique_ptr<BatchUserResponse> ExecuteBatch(
+      const BatchUserRequest& request) override;
+
+ protected:
+  virtual void Set(const std::string& key, const std::string& value);
+  std::string Get(const std::string& key);
+  std::string GetAllValues();
+  std::string GetRange(const std::string& min_key, const std::string& max_key);
+
+  void SetWithVersion(const std::string& key, const std::string& value,
+                      int version);
+  void GetWithVersion(const std::string& key, int version, ValueInfo* info);
+  void GetAllItems(Items* items);
+  void GetKeyRange(const std::string& min_key, const std::string& max_key,
+                   Items* items);
+  void GetHistory(const std::string& key, int min_key, int max_key,
+                  Items* items);
+  void GetTopHistory(const std::string& key, int top_number, Items* items);
+  void WorkerThread();
+
+ private:
+  std::unique_ptr<Storage> storage_;
+  std::unordered_map<std::string, int> lock_map_;
+  std::mutex lockMutex_;
+  std::priority_queue request_queue_;
+  bool is_stop_ = false;
+  int thread_count_;
+  vector<thread> thread_list_;
+};
+
+}  // namespace resdb

Reply via email to