This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a1c463d48 refactor(replica): use rocksdb API to read/write file (#1623)
a1c463d48 is described below
commit a1c463d48abe1b0c3f742b700fd7d52c5f456764
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Sep 26 06:56:19 2023 -0500
refactor(replica): use rocksdb API to read/write file (#1623)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the replica metadata
files read/write module.
---
src/replica/replica_restore.cpp | 41 ++++--------
src/replica/replication_app_base.cpp | 122 +++++++++--------------------------
src/replica/replication_app_base.h | 8 +--
3 files changed, 49 insertions(+), 122 deletions(-)
diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 1a3747977..7236f31a1 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -17,6 +17,8 @@
#include <boost/cstdint.hpp>
#include <boost/lexical_cast.hpp>
+#include <rocksdb/env.h>
+#include <rocksdb/status.h>
#include <stdint.h>
#include <atomic>
#include <fstream>
@@ -52,7 +54,6 @@
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
-#include "utils/utils.h"
using namespace dsn::dist::block_service;
@@ -93,41 +94,25 @@ bool replica::remove_useless_file_under_chkpt(const
std::string &chkpt_dir,
return true;
}
-bool replica::read_cold_backup_metadata(const std::string &file,
+bool replica::read_cold_backup_metadata(const std::string &fname,
cold_backup_metadata &backup_metadata)
{
- if (!::dsn::utils::filesystem::file_exists(file)) {
+ if (!::dsn::utils::filesystem::file_exists(fname)) {
LOG_ERROR_PREFIX(
- "checkpoint on remote storage media is damaged, coz file({})
doesn't exist", file);
+ "checkpoint on remote storage media is damaged, coz file({})
doesn't exist", fname);
return false;
}
- int64_t file_sz = 0;
- if (!::dsn::utils::filesystem::file_size(file, file_sz)) {
- LOG_ERROR_PREFIX("get file({}) size failed", file);
- return false;
- }
- std::shared_ptr<char> buf = utils::make_shared_array<char>(file_sz + 1);
- std::ifstream fin(file, std::ifstream::in);
- if (!fin.is_open()) {
- LOG_ERROR_PREFIX("open file({}) failed", file);
+ std::string data;
+ auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+ if (!s.ok()) {
+ LOG_ERROR_PREFIX("read file '{}' failed, err = {}", fname,
s.ToString());
return false;
}
- fin.read(buf.get(), file_sz);
- CHECK_EQ_MSG(file_sz,
- fin.gcount(),
- "{}: read file({}) failed, need {}, but read {}",
- name(),
- file,
- file_sz,
- fin.gcount());
- fin.close();
-
- buf.get()[fin.gcount()] = '\0';
- blob bb;
- bb.assign(std::move(buf), 0, file_sz);
- if (!::dsn::json::json_forwarder<cold_backup_metadata>::decode(bb,
backup_metadata)) {
- LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", file);
+
+ if (!::dsn::json::json_forwarder<cold_backup_metadata>::decode(
+ blob::create_from_bytes(std::move(data)), backup_metadata)) {
+ LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", fname);
return false;
}
return true;
diff --git a/src/replica/replication_app_base.cpp
b/src/replica/replication_app_base.cpp
index 4aa7a8e14..bc84c0c0a 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -25,16 +25,14 @@
*/
#include <alloca.h>
-#include <fcntl.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
#include <rocksdb/status.h>
-#include <algorithm>
#include <fstream>
#include <memory>
#include <utility>
#include <vector>
-#include "aio/aio_task.h"
-#include "aio/file_io.h"
#include "common/bulk_load_common.h"
#include "common/duplication_common.h"
#include "common/replica_envs.h"
@@ -50,7 +48,6 @@
#include "runtime/rpc/serialization.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
-#include "runtime/task/task_tracker.h"
#include "utils/binary_reader.h"
#include "utils/binary_writer.h"
#include "utils/blob.h"
@@ -60,57 +57,31 @@
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
-#include "utils/ports.h"
#include "utils/string_view.h"
-#include "utils/threadpool_code.h"
-#include "utils/utils.h"
namespace dsn {
-class disk_file;
namespace replication {
const std::string replica_init_info::kInitInfo = ".init-info";
-DEFINE_TASK_CODE_AIO(LPC_AIO_INFO_WRITE, TASK_PRIORITY_COMMON,
THREAD_POOL_DEFAULT)
-
namespace {
-error_code write_blob_to_file(const std::string &file, const blob &data)
+error_code write_blob_to_file(const std::string &fname, const blob &data)
{
- std::string tmp_file = file + ".tmp";
- disk_file *hfile = file::open(tmp_file.c_str(), O_WRONLY | O_CREAT |
O_BINARY | O_TRUNC, 0666);
+ std::string tmp_fname = fname + ".tmp";
+ auto cleanup = defer([tmp_fname]() {
utils::filesystem::remove_path(tmp_fname); });
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(data.data(),
data.length()),
+ tmp_fname,
+ /* should_sync */ true);
LOG_AND_RETURN_NOT_TRUE(
- ERROR, hfile, ERR_FILE_OPERATION_FAILED, "open file {} failed",
tmp_file);
- auto cleanup = defer([tmp_file]() {
utils::filesystem::remove_path(tmp_file); });
-
- error_code err;
- size_t sz = 0;
- task_tracker tracker;
- aio_task_ptr tsk = file::write(hfile,
- data.data(),
- data.length(),
- 0,
- LPC_AIO_INFO_WRITE,
- &tracker,
- [&err, &sz](error_code e, size_t s) {
- err = e;
- sz = s;
- },
- 0);
- CHECK_NOTNULL(tsk, "create file::write task failed");
- tracker.wait_outstanding_tasks();
- file::flush(hfile);
- file::close(hfile);
- LOG_AND_RETURN_NOT_OK(ERROR, err, "write file {} failed", tmp_file);
- CHECK_EQ(data.length(), sz);
- // TODO(yingchun): need fsync too?
+ ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "write file {} failed",
tmp_fname);
LOG_AND_RETURN_NOT_TRUE(ERROR,
- utils::filesystem::rename_path(tmp_file, file),
+ utils::filesystem::rename_path(tmp_fname, fname),
ERR_FILE_OPERATION_FAILED,
"move file from {} to {} failed",
- tmp_file,
- file);
-
+ tmp_fname,
+ fname);
return ERR_OK;
}
} // namespace
@@ -145,38 +116,23 @@ error_code replica_init_info::store(const std::string
&dir)
return ERR_OK;
}
-error_code replica_init_info::load_json(const std::string &file)
+error_code replica_init_info::load_json(const std::string &fname)
{
- std::ifstream is(file, std::ios::binary);
- LOG_AND_RETURN_NOT_TRUE(
- ERROR, is.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed",
file);
-
- int64_t sz = 0;
+ std::string data;
+ auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+ LOG_AND_RETURN_NOT_TRUE(ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "read
file {} failed", fname);
LOG_AND_RETURN_NOT_TRUE(ERROR,
- utils::filesystem::file_size(std::string(file),
sz),
+ json::json_forwarder<replica_init_info>::decode(
+ blob::create_from_bytes(std::move(data)),
*this),
ERR_FILE_OPERATION_FAILED,
- "get file size of {} failed",
- file);
-
- std::shared_ptr<char> buffer(utils::make_shared_array<char>(sz));
- is.read((char *)buffer.get(), sz);
- LOG_AND_RETURN_NOT_TRUE(
- ERROR, !is.bad(), ERR_FILE_OPERATION_FAILED, "read file {} failed",
file);
- is.close();
-
- LOG_AND_RETURN_NOT_TRUE(
- ERROR,
- json::json_forwarder<replica_init_info>::decode(blob(buffer, sz),
*this),
- ERR_FILE_OPERATION_FAILED,
- "decode json from file {} failed",
- file);
-
+ "decode json from file {} failed",
+ fname);
return ERR_OK;
}
-error_code replica_init_info::store_json(const std::string &file)
+error_code replica_init_info::store_json(const std::string &fname)
{
- return write_blob_to_file(file,
json::json_forwarder<replica_init_info>::encode(*this));
+ return write_blob_to_file(fname,
json::json_forwarder<replica_init_info>::encode(*this));
}
std::string replica_init_info::to_string()
@@ -189,35 +145,21 @@ std::string replica_init_info::to_string()
return oss.str();
}
-error_code replica_app_info::load(const std::string &file)
+error_code replica_app_info::load(const std::string &fname)
{
- std::ifstream is(file, std::ios::binary);
- LOG_AND_RETURN_NOT_TRUE(
- ERROR, is.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed",
file);
-
- int64_t sz = 0;
- LOG_AND_RETURN_NOT_TRUE(ERROR,
- utils::filesystem::file_size(std::string(file),
sz),
- ERR_FILE_OPERATION_FAILED,
- "get file size of {} failed",
- file);
-
- std::shared_ptr<char> buffer(utils::make_shared_array<char>(sz));
- is.read((char *)buffer.get(), sz);
- is.close();
-
- binary_reader reader(blob(buffer, sz));
- int magic;
+ std::string data;
+ auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+ LOG_AND_RETURN_NOT_TRUE(ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "read
file {} failed", fname);
+ binary_reader reader(blob::create_from_bytes(std::move(data)));
+ int magic = 0;
unmarshall(reader, magic, DSF_THRIFT_BINARY);
-
LOG_AND_RETURN_NOT_TRUE(
- ERROR, magic == 0xdeadbeef, ERR_INVALID_DATA, "data in file {} is
invalid (magic)", file);
-
+ ERROR, magic == 0xdeadbeef, ERR_INVALID_DATA, "data in file {} is
invalid (magic)", fname);
unmarshall(reader, *_app, DSF_THRIFT_JSON);
return ERR_OK;
}
-error_code replica_app_info::store(const std::string &file)
+error_code replica_app_info::store(const std::string &fname)
{
binary_writer writer;
int magic = 0xdeadbeef;
@@ -237,7 +179,7 @@ error_code replica_app_info::store(const std::string &file)
marshall(writer, tmp, DSF_THRIFT_JSON);
}
- return write_blob_to_file(file, writer.get_buffer());
+ return write_blob_to_file(fname, writer.get_buffer());
}
/*static*/
diff --git a/src/replica/replication_app_base.h
b/src/replica/replication_app_base.h
index aafd97c38..5ae162a1b 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -76,8 +76,8 @@ public:
std::string to_string();
private:
- error_code load_json(const std::string &file);
- error_code store_json(const std::string &file);
+ error_code load_json(const std::string &fname);
+ error_code store_json(const std::string &fname);
};
class replica_app_info
@@ -87,8 +87,8 @@ private:
public:
replica_app_info(app_info *app) { _app = app; }
- error_code load(const std::string &file);
- error_code store(const std::string &file);
+ error_code load(const std::string &fname);
+ error_code store(const std::string &fname);
};
/// The store engine interface of Pegasus.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]