This is an automated email from the ASF dual-hosted git repository.
junchao pushed a commit to branch recovery_ckpt
in repository https://gitbox.apache.org/repos/asf/incubator-resilientdb.git
The following commit(s) were added to refs/heads/recovery_ckpt by this push:
new e6b1c292 add seq
e6b1c292 is described below
commit e6b1c292dd3f684c93b3e85e94bd12f19c55626c
Author: cjcchen <[email protected]>
AuthorDate: Thu Dec 4 13:42:08 2025 +0000
add seq
---
.bazelversion | 1 -
chain/storage/kv_storage_test.cpp | 63 ++++++++++++++
chain/storage/leveldb.cpp | 96 ++++++++++++++++++----
chain/storage/leveldb.h | 4 +-
chain/storage/memory_db.cpp | 58 ++++++++++---
chain/storage/memory_db.h | 10 ++-
chain/storage/proto/kv.proto | 1 +
chain/storage/storage.h | 9 +-
executor/common/transaction_manager.cpp | 10 +++
executor/common/transaction_manager.h | 9 +-
executor/kv/kv_executor.cpp | 18 ++--
.../consensus/execution/transaction_executor.cpp | 8 +-
.../consensus/ordering/pbft/message_manager.cpp | 14 +---
13 files changed, 237 insertions(+), 64 deletions(-)
diff --git a/.bazelversion b/.bazelversion
deleted file mode 100644
index 09b254e9..00000000
--- a/.bazelversion
+++ /dev/null
@@ -1 +0,0 @@
-6.0.0
diff --git a/chain/storage/kv_storage_test.cpp
b/chain/storage/kv_storage_test.cpp
index 2e55161f..96a4fa52 100644
--- a/chain/storage/kv_storage_test.cpp
+++ b/chain/storage/kv_storage_test.cpp
@@ -126,6 +126,69 @@ TEST_P(KVStorageTest, GetAllValueWithVersion) {
}
}
+TEST_P(KVStorageTest, SetValueWithSeq) {
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value", 2), 0);
+
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value", 1), -2);
+
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", static_cast<uint64_t>(1)),
+ std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", 2),
+ std::make_pair(std::string("test_value"),
static_cast<uint64_t>(2)));
+
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value_v2", 3), 0);
+
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", 1),
+ std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", 2),
+ std::make_pair(std::string("test_value"),
static_cast<uint64_t>(2)));
+
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", 3),
+ std::make_pair(std::string("test_value_v2"),
static_cast<uint64_t>(3)));
+
+ EXPECT_EQ(storage->GetValueWithSeq("test_key", 4),
+ std::make_pair(std::string(""), static_cast<uint64_t>(0)));
+}
+
+TEST_P(KVStorageTest, GetAllValueWithSeq) {
+typedef std::vector<std::pair<std::string, uint64_t>> List;
+ {
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>> >
expected_list{
+ std::make_pair("test_key", List{std::make_pair("test_value", 1)})};
+
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value", 1), 0);
+ EXPECT_EQ(storage->GetAllItemsWithSeq(), expected_list);
+ }
+
+ {
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>> >
expected_list{
+ std::make_pair("test_key", List{std::make_pair("test_value", 1),
std::make_pair("test_value_v2", 2)})};
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value_v2", 2), 0);
+ EXPECT_EQ(storage->GetAllItemsWithSeq(), expected_list);
+ }
+
+ {
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>> >
expected_list{
+ std::make_pair("test_key_v1", List{std::make_pair("test_value1", 1)}),
+ std::make_pair("test_key", List{std::make_pair("test_value",
1),std::make_pair("test_value_v2", 2)})};
+ EXPECT_EQ(storage->SetValueWithSeq("test_key_v1", "test_value1", 1), 0);
+ EXPECT_EQ(storage->GetAllItemsWithSeq(), expected_list);
+ }
+
+
+ {
+ storage->SetMaxHistoryNum(2);
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>> >
expected_list{
+ std::make_pair("test_key_v1", List{std::make_pair("test_value1", 1)}),
+ std::make_pair("test_key", List{std::make_pair("test_value3", 3),
+ std::make_pair("test_value4", 4)})};
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value3", 3), 0);
+ EXPECT_EQ(storage->SetValueWithSeq("test_key", "test_value4", 4), 0);
+ EXPECT_EQ(storage->GetAllItemsWithSeq(), expected_list);
+ }
+}
+
TEST_P(KVStorageTest, GetKeyRange) {
EXPECT_EQ(storage->SetValueWithVersion("1", "value1", 0), 0);
EXPECT_EQ(storage->SetValueWithVersion("2", "value2", 0), 0);
diff --git a/chain/storage/leveldb.cpp b/chain/storage/leveldb.cpp
index f6122b5a..f000d6c0 100644
--- a/chain/storage/leveldb.cpp
+++ b/chain/storage/leveldb.cpp
@@ -93,6 +93,64 @@ ResLevelDB::~ResLevelDB() {
}
}
+int ResLevelDB::SetValueWithSeq(const std::string& key,
+ const std::string& value, uint64_t seq) {
+ std::string value_str = GetValue(key);
+ ValueHistory history;
+ if (!history.ParseFromString(value_str)) {
+ LOG(ERROR) << "old_value parse fail";
+ return -2;
+ }
+
+ uint64_t last_seq = 0;
+ if (history.value_size() > 0) {
+ last_seq = history.value(history.value_size() - 1).seq();
+ }
+
+ if (last_seq > seq) {
+ LOG(ERROR) << "seq is small, last:" << last_seq << " new seq:" << seq;
+ return -2;
+ }
+
+ Value* new_value = history.add_value();
+ new_value->set_value(value);
+ new_value->set_seq(seq);
+
+ while(history.value_size()>max_history_) {
+ history.mutable_value()->erase(history.mutable_value()->begin());
+ }
+
+ LOG(ERROR)<<" set value, string:"<<key<<" seq:"<<seq;
+ history.SerializeToString(&value_str);
+ return SetValue(key, value_str);
+}
+
+std::pair<std::string, uint64_t> ResLevelDB::GetValueWithSeq(
+ const std::string& key, uint64_t seq) {
+ std::string value_str = GetValue(key);
+ ValueHistory history;
+ if (!history.ParseFromString(value_str)) {
+ LOG(ERROR) << "old_value parse fail";
+ return std::make_pair("", 0);
+ }
+ if (history.value_size() == 0) {
+ return std::make_pair("", 0);
+ }
+ if (seq > 0) {
+ for (int i = history.value_size() - 1; i >= 0; --i) {
+ if (history.value(i).seq() == seq) {
+ return std::make_pair(history.value(i).value(),
+ history.value(i).seq());
+ }
+ }
+ return std::make_pair("", 0);
+ }
+ int last_idx = history.value_size() - 1;
+ return std::make_pair(history.value(last_idx).value(),
+ history.value(last_idx).seq());
+}
+
+
int ResLevelDB::SetValue(const std::string& key, const std::string& value) {
if (block_cache_) {
block_cache_->Put(key, value);
@@ -133,21 +191,6 @@ std::string ResLevelDB::GetValue(const std::string& key) {
return value;
}
-std::string ResLevelDB::GetAllValues(void) {
- std::string values = "[";
- leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
- bool first_iteration = true;
- for (it->SeekToFirst(); it->Valid(); it->Next()) {
- if (!first_iteration) values.append(",");
- first_iteration = false;
- values.append(it->value().ToString());
- }
- values.append("]");
-
- delete it;
- return values;
-}
-
std::string ResLevelDB::GetRange(const std::string& min_key,
const std::string& max_key) {
std::string values = "[";
@@ -243,6 +286,29 @@ std::pair<std::string, int>
ResLevelDB::GetValueWithVersion(
history.value(last_idx).version());
}
+// Return a map of <key, <value, version>>
+std::map<std::string, std::vector<std::pair<std::string, uint64_t>>>
ResLevelDB::GetAllItemsWithSeq() {
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>>> resp;
+
+ leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
+ for (it->SeekToFirst(); it->Valid(); it->Next()) {
+ ValueHistory history;
+ if (!history.ParseFromString(it->value().ToString()) ||
+ history.value_size() == 0) {
+ LOG(ERROR) << "old_value parse fail";
+ continue;
+ }
+ LOG(ERROR)<<"history value size:"<<history.value_size();
+ for(auto value: history.value()) {
+ resp[it->key().ToString()].push_back(std::make_pair(value.value(),
value.seq()));
+ }
+ }
+ delete it;
+
+ return resp;
+}
+
+
// Return a map of <key, <value, version>>
std::map<std::string, std::pair<std::string, int>> ResLevelDB::GetAllItems() {
std::map<std::string, std::pair<std::string, int>> resp;
diff --git a/chain/storage/leveldb.h b/chain/storage/leveldb.h
index 6103ec20..3cf97c50 100644
--- a/chain/storage/leveldb.h
+++ b/chain/storage/leveldb.h
@@ -43,9 +43,10 @@ class ResLevelDB : public Storage {
ResLevelDB(std::optional<LevelDBInfo> config_data = std::nullopt);
virtual ~ResLevelDB();
+ int SetValueWithSeq(const std::string& key, const std::string& value,
uint64_t seq) override;
int SetValue(const std::string& key, const std::string& value) override;
std::string GetValue(const std::string& key) override;
- std::string GetAllValues(void) override;
+ std::pair<std::string,uint64_t> GetValueWithSeq(const std::string& key,
uint64_t seq) override;
std::string GetRange(const std::string& min_key,
const std::string& max_key) override;
@@ -55,6 +56,7 @@ class ResLevelDB : public Storage {
int version) override;
// Return a map of <key, <value, version>>
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>>>
GetAllItemsWithSeq() override;
std::map<std::string, std::pair<std::string, int>> GetAllItems() override;
std::map<std::string, std::pair<std::string, int>> GetKeyRange(
const std::string& min_key, const std::string& max_key) override;
diff --git a/chain/storage/memory_db.cpp b/chain/storage/memory_db.cpp
index 2d5b8725..c3e85133 100644
--- a/chain/storage/memory_db.cpp
+++ b/chain/storage/memory_db.cpp
@@ -33,18 +33,6 @@ int MemoryDB::SetValue(const std::string& key, const
std::string& value) {
return 0;
}
-std::string MemoryDB::GetAllValues(void) {
- std::string values = "[";
- bool first_iteration = true;
- for (auto kv : kv_map_) {
- if (!first_iteration) values.append(",");
- first_iteration = false;
- values.append(kv.second);
- }
- values.append("]");
- return values;
-}
-
std::string MemoryDB::GetRange(const std::string& min_key,
const std::string& max_key) {
std::string values = "[";
@@ -69,6 +57,41 @@ std::string MemoryDB::GetValue(const std::string& key) {
}
}
+std::pair<std::string, uint64_t> MemoryDB::GetValueWithSeq(
+ const std::string& key, uint64_t seq) {
+ auto search_it = kv_map_with_seq_.find(key);
+ if (search_it != kv_map_with_seq_.end() && search_it->second.size()) {
+ auto it = search_it->second.end();
+ do {
+ --it;
+ if (it->second == seq) {
+ return *it;
+ }
+ if (it->second < seq) {
+ break;
+ }
+ } while (it != search_it->second.begin());
+ LOG(ERROR) << " key:" << key << " no seq:" << seq;
+ }
+ return std::make_pair("", 0);
+}
+
+int MemoryDB::SetValueWithSeq(const std::string& key,
+ const std::string& value, uint64_t seq) {
+ auto it = kv_map_with_seq_.find(key);
+ if (it != kv_map_with_seq_.end() && it->second.back().second > seq) {
+ LOG(ERROR) << " value seq not match. key:" << key << " db seq:"
+ << (it == kv_map_with_seq_.end() ? 0 : it->second.back().second)
+ << " new seq:" << seq;
+ return -2;
+ }
+ kv_map_with_seq_[key].push_back(std::make_pair(value, seq));
+ while(kv_map_with_seq_[key].size()>max_history_) {
+ kv_map_with_seq_[key].erase(kv_map_with_seq_[key].begin());
+ }
+ return 0;
+}
+
int MemoryDB::SetValueWithVersion(const std::string& key,
const std::string& value, int version) {
auto it = kv_map_with_v_.find(key);
@@ -105,6 +128,17 @@ std::pair<std::string, int> MemoryDB::GetValueWithVersion(
return std::make_pair("", 0);
}
+std::map<std::string, std::vector<std::pair<std::string, uint64_t>>>
MemoryDB::GetAllItemsWithSeq() {
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>>> resp;
+ for (const auto& it : kv_map_with_seq_) {
+ LOG(ERROR)<<" value num:"<<it.second.size();
+ for(const auto& item : it.second){
+ resp[it.first].push_back(item);
+ }
+ }
+ return resp;
+}
+
std::map<std::string, std::pair<std::string, int>> MemoryDB::GetAllItems() {
std::map<std::string, std::pair<std::string, int>> resp;
diff --git a/chain/storage/memory_db.h b/chain/storage/memory_db.h
index 372f4647..ca498227 100644
--- a/chain/storage/memory_db.h
+++ b/chain/storage/memory_db.h
@@ -53,10 +53,11 @@ class MemoryDB : public Storage {
public:
MemoryDB();
- int SetValue(const std::string& key, const std::string& value);
- std::string GetValue(const std::string& key);
+ int SetValueWithSeq(const std::string& key, const std::string& value,
uint64_t seq) override;
+ int SetValue(const std::string& key, const std::string& value) override;
+ std::string GetValue(const std::string& key) override;
+ std::pair<std::string, uint64_t> GetValueWithSeq(const std::string& key,
uint64_t seq) override;
- std::string GetAllValues() override;
std::string GetRange(const std::string& min_key,
const std::string& max_key) override;
@@ -66,6 +67,7 @@ class MemoryDB : public Storage {
int version) override;
// Return a map of <key, <value, version>>
+ std::map<std::string, std::vector<std::pair<std::string, uint64_t>>>
GetAllItemsWithSeq() override;
std::map<std::string, std::pair<std::string, int>> GetAllItems() override;
std::map<std::string, std::pair<std::string, int>> GetKeyRange(
const std::string& min_key, const std::string& max_key) override;
@@ -82,6 +84,8 @@ class MemoryDB : public Storage {
std::unordered_map<std::string, std::string> kv_map_;
std::unordered_map<std::string, std::list<std::pair<std::string, int>>>
kv_map_with_v_;
+ std::unordered_map<std::string, std::list<std::pair<std::string, uint64_t>>>
+ kv_map_with_seq_;
};
} // namespace storage
diff --git a/chain/storage/proto/kv.proto b/chain/storage/proto/kv.proto
index 8466b8dc..319d7fb0 100644
--- a/chain/storage/proto/kv.proto
+++ b/chain/storage/proto/kv.proto
@@ -24,6 +24,7 @@ package resdb.storage;
message Value {
bytes value = 1;
int32 version = 2;
+ uint64 seq = 3;
}
message ValueHistory {
diff --git a/chain/storage/storage.h b/chain/storage/storage.h
index 76e352de..445f0c1f 100644
--- a/chain/storage/storage.h
+++ b/chain/storage/storage.h
@@ -31,8 +31,10 @@ class Storage {
virtual ~Storage() = default;
virtual int SetValue(const std::string& key, const std::string& value) = 0;
+ virtual int SetValueWithSeq(const std::string& key, const std::string&
value, uint64_t seq) = 0;
virtual std::string GetValue(const std::string& key) = 0;
- virtual std::string GetAllValues() = 0;
+ virtual std::pair<std::string, uint64_t> GetValueWithSeq(
+ const std::string& key, uint64_t seq) = 0;
virtual std::string GetRange(const std::string& min_key,
const std::string& max_key) = 0;
@@ -42,6 +44,7 @@ class Storage {
const std::string& key, int version) = 0;
// Return a map of <key, <value, version>>
+ virtual std::map<std::string, std::vector<std::pair<std::string, uint64_t>>>
GetAllItemsWithSeq() = 0;
virtual std::map<std::string, std::pair<std::string, int>> GetAllItems() = 0;
virtual std::map<std::string, std::pair<std::string, int>> GetKeyRange(
const std::string& min_key, const std::string& max_key) = 0;
@@ -55,6 +58,10 @@ class Storage {
const std::string& key, int number) = 0;
virtual bool Flush() { return true; };
+
+ void SetMaxHistoryNum(int num) { max_history_ = num; }
+ protected:
+ uint32_t max_history_ = 10;
};
} // namespace resdb
diff --git a/executor/common/transaction_manager.cpp
b/executor/common/transaction_manager.cpp
index 74df05c9..372a00f3 100644
--- a/executor/common/transaction_manager.cpp
+++ b/executor/common/transaction_manager.cpp
@@ -62,6 +62,11 @@ std::unique_ptr<std::string>
TransactionManager::ExecuteRequest(
return nullptr;
}
+std::vector<std::unique_ptr<std::string>>
TransactionManager::ExecuteBatchDataWithSeq(
+ uint64_t seq, const
std::vector<std::unique_ptr<google::protobuf::Message>>& requests) {
+ return ExecuteBatchData(requests);
+}
+
std::vector<std::unique_ptr<std::string>> TransactionManager::ExecuteBatchData(
const std::vector<std::unique_ptr<google::protobuf::Message>>& requests) {
// LOG(ERROR)<<"execute data:"<<requests.size();
@@ -78,6 +83,11 @@ std::vector<std::unique_ptr<std::string>>
TransactionManager::ExecuteBatchData(
return ret;
}
+std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatchWithSeq(
+ uint64_t seq, const BatchUserRequest& request) {
+ seq_ = seq;
+ return ExecuteBatch(request);
+}
std::unique_ptr<BatchUserResponse> TransactionManager::ExecuteBatch(
const BatchUserRequest& request) {
diff --git a/executor/common/transaction_manager.h
b/executor/common/transaction_manager.h
index b1d564ac..c0436faf 100644
--- a/executor/common/transaction_manager.h
+++ b/executor/common/transaction_manager.h
@@ -37,14 +37,18 @@ class TransactionManager {
TransactionManager(bool is_out_of_order = false, bool need_response = true);
virtual ~TransactionManager() = default;
- virtual std::unique_ptr<BatchUserResponse> ExecuteBatch(
- const BatchUserRequest& request);
+ std::unique_ptr<BatchUserResponse> ExecuteBatchWithSeq(uint64_t seq, const
BatchUserRequest& request);
+
+ virtual std::unique_ptr<BatchUserResponse> ExecuteBatch(const
BatchUserRequest& request);
std::unique_ptr<std::vector<std::unique_ptr<google::protobuf::Message>>>
Prepare(const BatchUserRequest& request);
+ std::vector<std::unique_ptr<std::string>> ExecuteBatchDataWithSeq(
+ uint64_t seq, const
std::vector<std::unique_ptr<google::protobuf::Message>>& requests);
std::vector<std::unique_ptr<std::string>> ExecuteBatchData(
const std::vector<std::unique_ptr<google::protobuf::Message>>& requests);
+
virtual std::unique_ptr<std::string> ExecuteData(const std::string& request);
bool IsOutOfOrder();
@@ -58,6 +62,7 @@ class TransactionManager {
const std::string& data);
virtual std::unique_ptr<std::string> ExecuteRequest(
const google::protobuf::Message& request);
+ uint64_t seq_ = 0;
private:
bool is_out_of_order_ = false;
bool need_response_ = true;
diff --git a/executor/kv/kv_executor.cpp b/executor/kv/kv_executor.cpp
index c587f592..d1e7bea8 100644
--- a/executor/kv/kv_executor.cpp
+++ b/executor/kv/kv_executor.cpp
@@ -135,16 +135,17 @@ std::unique_ptr<std::string> KVExecutor::ExecuteData(
}
void KVExecutor::Set(const std::string& key, const std::string& value) {
- LOG(ERROR)<<" set key:"<<key;
- storage_->SetValue(key, value);
+ storage_->SetValueWithSeq(key, value, seq_);
}
std::string KVExecutor::Get(const std::string& key) {
LOG(ERROR)<<" get key:"<<key;
- return storage_->GetValue(key);
+ return storage_->GetValueWithSeq(key,0).first;
}
-std::string KVExecutor::GetAllValues() { return storage_->GetAllValues(); }
+std::string KVExecutor::GetAllValues() {
+ return "";
+}
// Get values on a range of keys
std::string KVExecutor::GetRange(const std::string& min_key,
@@ -165,14 +166,7 @@ void KVExecutor::GetWithVersion(const std::string& key,
int version,
}
void KVExecutor::GetAllItems(Items* items) {
- const std::map<std::string, std::pair<std::string, int>>& ret =
- storage_->GetAllItems();
- for (auto it : ret) {
- Item* item = items->add_item();
- item->set_key(it.first);
- item->mutable_value_info()->set_value(it.second.first);
- item->mutable_value_info()->set_version(it.second.second);
- }
+ return;
}
void KVExecutor::GetKeyRange(const std::string& min_key,
diff --git a/platform/consensus/execution/transaction_executor.cpp
b/platform/consensus/execution/transaction_executor.cpp
index d2661828..c601cc18 100644
--- a/platform/consensus/execution/transaction_executor.cpp
+++ b/platform/consensus/execution/transaction_executor.cpp
@@ -246,7 +246,7 @@ void
TransactionExecutor::OnlyExecute(std::unique_ptr<Request> request) {
std::unique_ptr<BatchUserResponse> response;
global_stats_->GetTransactionDetails(batch_request);
if (transaction_manager_) {
- response = transaction_manager_->ExecuteBatch(batch_request);
+ response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
batch_request);
}
// global_stats_->IncTotalRequest(batch_request.user_requests_size());
@@ -291,7 +291,7 @@ void TransactionExecutor::Execute(std::unique_ptr<Request>
request,
global_stats_->GetTransactionDetails(*batch_request_p);
if (transaction_manager_ && need_execute) {
if (execute_thread_num_ == 1) {
- response = transaction_manager_->ExecuteBatch(*batch_request_p);
+ response = transaction_manager_->ExecuteBatchWithSeq(request->seq(),
*batch_request_p);
} else {
std::vector<std::unique_ptr<std::string>> response_v;
@@ -308,10 +308,10 @@ void
TransactionExecutor::Execute(std::unique_ptr<Request> request,
WaitForExecute(request->seq());
if(data_p->empty() || (*data_p)[0] == nullptr){
- response =
transaction_manager_->ExecuteBatch(*batch_request_p);
+ response =
transaction_manager_->ExecuteBatchWithSeq(request->seq(), *batch_request_p);
}
else {
- response_v =
transaction_manager_->ExecuteBatchData(*data_p);
+ response_v =
transaction_manager_->ExecuteBatchDataWithSeq(request->seq(), *data_p);
}
FinishExecute(request->seq());
diff --git a/platform/consensus/ordering/pbft/message_manager.cpp
b/platform/consensus/ordering/pbft/message_manager.cpp
index 44b1c25a..2c8b08cc 100644
--- a/platform/consensus/ordering/pbft/message_manager.cpp
+++ b/platform/consensus/ordering/pbft/message_manager.cpp
@@ -40,19 +40,7 @@ MessageManager::MessageManager(
std::unique_ptr<BatchUserResponse> resp_msg) {
if (request->is_recovery()) {
if (checkpoint_manager_) {
- // checkpoint_manager_->AddCommitData(std::move(request));
- // std::cout<<"In message here"<<std::endl;
- // uint64_t latest_executed_seq =
checkpoint_manager_->latest_executed_seq_;
- // std::string temp_dir = "/tmp";
- // std::string file_path = temp_dir + "/latest_seqnum.txt";
- // std::ofstream log_file(file_path, std::ios::app);
- // // std::ofstream log_file("latest_seqnum.txt");
- // if (!log_file.is_open()) {
- // std::cerr << "Error: Could not open the log file." <<
std::strerror(errno) << std::endl;
- // }
- // log_file << "Lastest_seqnum: " << latest_executed_seq <<
std::endl;
- // log_file.flush();
- // log_file.close();
+ checkpoint_manager_->AddCommitData(std::move(request));
}
return;
}