This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a1c463d48 refactor(replica): use rocksdb API to read/write file (#1623)
a1c463d48 is described below

commit a1c463d48abe1b0c3f742b700fd7d52c5f456764
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Sep 26 06:56:19 2023 -0500

    refactor(replica): use rocksdb API to read/write file (#1623)
    
    https://github.com/apache/incubator-pegasus/issues/887
    
    There is no functional changes, but only refactor the replica metadata
    files read/write module.
---
 src/replica/replica_restore.cpp      |  41 ++++--------
 src/replica/replication_app_base.cpp | 122 +++++++++--------------------------
 src/replica/replication_app_base.h   |   8 +--
 3 files changed, 49 insertions(+), 122 deletions(-)

diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 1a3747977..7236f31a1 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -17,6 +17,8 @@
 
 #include <boost/cstdint.hpp>
 #include <boost/lexical_cast.hpp>
+#include <rocksdb/env.h>
+#include <rocksdb/status.h>
 #include <stdint.h>
 #include <atomic>
 #include <fstream>
@@ -52,7 +54,6 @@
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
-#include "utils/utils.h"
 
 using namespace dsn::dist::block_service;
 
@@ -93,41 +94,25 @@ bool replica::remove_useless_file_under_chkpt(const 
std::string &chkpt_dir,
     return true;
 }
 
-bool replica::read_cold_backup_metadata(const std::string &file,
+bool replica::read_cold_backup_metadata(const std::string &fname,
                                         cold_backup_metadata &backup_metadata)
 {
-    if (!::dsn::utils::filesystem::file_exists(file)) {
+    if (!::dsn::utils::filesystem::file_exists(fname)) {
         LOG_ERROR_PREFIX(
-            "checkpoint on remote storage media is damaged, coz file({}) 
doesn't exist", file);
+            "checkpoint on remote storage media is damaged, coz file({}) 
doesn't exist", fname);
         return false;
     }
-    int64_t file_sz = 0;
-    if (!::dsn::utils::filesystem::file_size(file, file_sz)) {
-        LOG_ERROR_PREFIX("get file({}) size failed", file);
-        return false;
-    }
-    std::shared_ptr<char> buf = utils::make_shared_array<char>(file_sz + 1);
 
-    std::ifstream fin(file, std::ifstream::in);
-    if (!fin.is_open()) {
-        LOG_ERROR_PREFIX("open file({}) failed", file);
+    std::string data;
+    auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+    if (!s.ok()) {
+        LOG_ERROR_PREFIX("read file '{}' failed, err = {}", fname, 
s.ToString());
         return false;
     }
-    fin.read(buf.get(), file_sz);
-    CHECK_EQ_MSG(file_sz,
-                 fin.gcount(),
-                 "{}: read file({}) failed, need {}, but read {}",
-                 name(),
-                 file,
-                 file_sz,
-                 fin.gcount());
-    fin.close();
-
-    buf.get()[fin.gcount()] = '\0';
-    blob bb;
-    bb.assign(std::move(buf), 0, file_sz);
-    if (!::dsn::json::json_forwarder<cold_backup_metadata>::decode(bb, 
backup_metadata)) {
-        LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", file);
+
+    if (!::dsn::json::json_forwarder<cold_backup_metadata>::decode(
+            blob::create_from_bytes(std::move(data)), backup_metadata)) {
+        LOG_ERROR_PREFIX("file({}) under checkpoint is damaged", fname);
         return false;
     }
     return true;
diff --git a/src/replica/replication_app_base.cpp 
b/src/replica/replication_app_base.cpp
index 4aa7a8e14..bc84c0c0a 100644
--- a/src/replica/replication_app_base.cpp
+++ b/src/replica/replication_app_base.cpp
@@ -25,16 +25,14 @@
  */
 
 #include <alloca.h>
-#include <fcntl.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
 #include <rocksdb/status.h>
-#include <algorithm>
 #include <fstream>
 #include <memory>
 #include <utility>
 #include <vector>
 
-#include "aio/aio_task.h"
-#include "aio/file_io.h"
 #include "common/bulk_load_common.h"
 #include "common/duplication_common.h"
 #include "common/replica_envs.h"
@@ -50,7 +48,6 @@
 #include "runtime/rpc/serialization.h"
 #include "runtime/task/task_code.h"
 #include "runtime/task/task_spec.h"
-#include "runtime/task/task_tracker.h"
 #include "utils/binary_reader.h"
 #include "utils/binary_writer.h"
 #include "utils/blob.h"
@@ -60,57 +57,31 @@
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
 #include "utils/latency_tracer.h"
-#include "utils/ports.h"
 #include "utils/string_view.h"
-#include "utils/threadpool_code.h"
-#include "utils/utils.h"
 
 namespace dsn {
-class disk_file;
 
 namespace replication {
 
 const std::string replica_init_info::kInitInfo = ".init-info";
 
-DEFINE_TASK_CODE_AIO(LPC_AIO_INFO_WRITE, TASK_PRIORITY_COMMON, 
THREAD_POOL_DEFAULT)
-
 namespace {
-error_code write_blob_to_file(const std::string &file, const blob &data)
+error_code write_blob_to_file(const std::string &fname, const blob &data)
 {
-    std::string tmp_file = file + ".tmp";
-    disk_file *hfile = file::open(tmp_file.c_str(), O_WRONLY | O_CREAT | 
O_BINARY | O_TRUNC, 0666);
+    std::string tmp_fname = fname + ".tmp";
+    auto cleanup = defer([tmp_fname]() { 
utils::filesystem::remove_path(tmp_fname); });
+    auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                        rocksdb::Slice(data.data(), 
data.length()),
+                                        tmp_fname,
+                                        /* should_sync */ true);
     LOG_AND_RETURN_NOT_TRUE(
-        ERROR, hfile, ERR_FILE_OPERATION_FAILED, "open file {} failed", 
tmp_file);
-    auto cleanup = defer([tmp_file]() { 
utils::filesystem::remove_path(tmp_file); });
-
-    error_code err;
-    size_t sz = 0;
-    task_tracker tracker;
-    aio_task_ptr tsk = file::write(hfile,
-                                   data.data(),
-                                   data.length(),
-                                   0,
-                                   LPC_AIO_INFO_WRITE,
-                                   &tracker,
-                                   [&err, &sz](error_code e, size_t s) {
-                                       err = e;
-                                       sz = s;
-                                   },
-                                   0);
-    CHECK_NOTNULL(tsk, "create file::write task failed");
-    tracker.wait_outstanding_tasks();
-    file::flush(hfile);
-    file::close(hfile);
-    LOG_AND_RETURN_NOT_OK(ERROR, err, "write file {} failed", tmp_file);
-    CHECK_EQ(data.length(), sz);
-    // TODO(yingchun): need fsync too?
+        ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "write file {} failed", 
tmp_fname);
     LOG_AND_RETURN_NOT_TRUE(ERROR,
-                            utils::filesystem::rename_path(tmp_file, file),
+                            utils::filesystem::rename_path(tmp_fname, fname),
                             ERR_FILE_OPERATION_FAILED,
                             "move file from {} to {} failed",
-                            tmp_file,
-                            file);
-
+                            tmp_fname,
+                            fname);
     return ERR_OK;
 }
 } // namespace
@@ -145,38 +116,23 @@ error_code replica_init_info::store(const std::string 
&dir)
     return ERR_OK;
 }
 
-error_code replica_init_info::load_json(const std::string &file)
+error_code replica_init_info::load_json(const std::string &fname)
 {
-    std::ifstream is(file, std::ios::binary);
-    LOG_AND_RETURN_NOT_TRUE(
-        ERROR, is.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed", 
file);
-
-    int64_t sz = 0;
+    std::string data;
+    auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+    LOG_AND_RETURN_NOT_TRUE(ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "read 
file {} failed", fname);
     LOG_AND_RETURN_NOT_TRUE(ERROR,
-                            utils::filesystem::file_size(std::string(file), 
sz),
+                            json::json_forwarder<replica_init_info>::decode(
+                                blob::create_from_bytes(std::move(data)), 
*this),
                             ERR_FILE_OPERATION_FAILED,
-                            "get file size of {} failed",
-                            file);
-
-    std::shared_ptr<char> buffer(utils::make_shared_array<char>(sz));
-    is.read((char *)buffer.get(), sz);
-    LOG_AND_RETURN_NOT_TRUE(
-        ERROR, !is.bad(), ERR_FILE_OPERATION_FAILED, "read file {} failed", 
file);
-    is.close();
-
-    LOG_AND_RETURN_NOT_TRUE(
-        ERROR,
-        json::json_forwarder<replica_init_info>::decode(blob(buffer, sz), 
*this),
-        ERR_FILE_OPERATION_FAILED,
-        "decode json from file {} failed",
-        file);
-
+                            "decode json from file {} failed",
+                            fname);
     return ERR_OK;
 }
 
-error_code replica_init_info::store_json(const std::string &file)
+error_code replica_init_info::store_json(const std::string &fname)
 {
-    return write_blob_to_file(file, 
json::json_forwarder<replica_init_info>::encode(*this));
+    return write_blob_to_file(fname, 
json::json_forwarder<replica_init_info>::encode(*this));
 }
 
 std::string replica_init_info::to_string()
@@ -189,35 +145,21 @@ std::string replica_init_info::to_string()
     return oss.str();
 }
 
-error_code replica_app_info::load(const std::string &file)
+error_code replica_app_info::load(const std::string &fname)
 {
-    std::ifstream is(file, std::ios::binary);
-    LOG_AND_RETURN_NOT_TRUE(
-        ERROR, is.is_open(), ERR_FILE_OPERATION_FAILED, "open file {} failed", 
file);
-
-    int64_t sz = 0;
-    LOG_AND_RETURN_NOT_TRUE(ERROR,
-                            utils::filesystem::file_size(std::string(file), 
sz),
-                            ERR_FILE_OPERATION_FAILED,
-                            "get file size of {} failed",
-                            file);
-
-    std::shared_ptr<char> buffer(utils::make_shared_array<char>(sz));
-    is.read((char *)buffer.get(), sz);
-    is.close();
-
-    binary_reader reader(blob(buffer, sz));
-    int magic;
+    std::string data;
+    auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fname, &data);
+    LOG_AND_RETURN_NOT_TRUE(ERROR, s.ok(), ERR_FILE_OPERATION_FAILED, "read 
file {} failed", fname);
+    binary_reader reader(blob::create_from_bytes(std::move(data)));
+    int magic = 0;
     unmarshall(reader, magic, DSF_THRIFT_BINARY);
-
     LOG_AND_RETURN_NOT_TRUE(
-        ERROR, magic == 0xdeadbeef, ERR_INVALID_DATA, "data in file {} is 
invalid (magic)", file);
-
+        ERROR, magic == 0xdeadbeef, ERR_INVALID_DATA, "data in file {} is 
invalid (magic)", fname);
     unmarshall(reader, *_app, DSF_THRIFT_JSON);
     return ERR_OK;
 }
 
-error_code replica_app_info::store(const std::string &file)
+error_code replica_app_info::store(const std::string &fname)
 {
     binary_writer writer;
     int magic = 0xdeadbeef;
@@ -237,7 +179,7 @@ error_code replica_app_info::store(const std::string &file)
         marshall(writer, tmp, DSF_THRIFT_JSON);
     }
 
-    return write_blob_to_file(file, writer.get_buffer());
+    return write_blob_to_file(fname, writer.get_buffer());
 }
 
 /*static*/
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index aafd97c38..5ae162a1b 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -76,8 +76,8 @@ public:
     std::string to_string();
 
 private:
-    error_code load_json(const std::string &file);
-    error_code store_json(const std::string &file);
+    error_code load_json(const std::string &fname);
+    error_code store_json(const std::string &fname);
 };
 
 class replica_app_info
@@ -87,8 +87,8 @@ private:
 
 public:
     replica_app_info(app_info *app) { _app = app; }
-    error_code load(const std::string &file);
-    error_code store(const std::string &file);
+    error_code load(const std::string &fname);
+    error_code store(const std::string &fname);
 };
 
 /// The store engine interface of Pegasus.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to