This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 64abf9055 refactor(local service): use rocksdb API to read/write file
(#1619)
64abf9055 is described below
commit 64abf905595d8f2ac2520c66b0554c121b01b6bb
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Sep 26 04:08:43 2023 -0500
refactor(local service): use rocksdb API to read/write file (#1619)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the local service and
related unit test.
And now FLAGS_enable_direct_io takes effect on local block service.
---
src/block_service/local/CMakeLists.txt | 2 +-
src/block_service/local/local_service.cpp | 339 ++++++++++++++------------
src/block_service/test/local_service_test.cpp | 64 +++--
3 files changed, 227 insertions(+), 178 deletions(-)
diff --git a/src/block_service/local/CMakeLists.txt
b/src/block_service/local/CMakeLists.txt
index 0886bece9..9d830f782 100644
--- a/src/block_service/local/CMakeLists.txt
+++ b/src/block_service/local/CMakeLists.txt
@@ -26,7 +26,7 @@ set(MY_PROJ_SRC "")
#"GLOB" for non - recursive search
set(MY_SRC_SEARCH_MODE "GLOB")
-set(MY_PROJ_LIBS "")
+set(MY_PROJ_LIBS rocksdb)
#Extra files that will be installed
set(MY_BINPLACES "")
diff --git a/src/block_service/local/local_service.cpp
b/src/block_service/local/local_service.cpp
index 04cf1cc3d..917596134 100644
--- a/src/block_service/local/local_service.cpp
+++ b/src/block_service/local/local_service.cpp
@@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-#include <errno.h>
-#include <algorithm>
+#include <rocksdb/env.h>
#include <initializer_list>
-#include <istream>
#include <memory>
#include <set>
#include <type_traits>
@@ -27,18 +25,22 @@
#include "local_service.h"
#include "nlohmann/json.hpp"
#include "nlohmann/json_fwd.hpp"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/task/async_calls.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
-#include "utils/defer.h"
+#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
-#include "utils/safe_strerror_posix.h"
#include "utils/string_view.h"
#include "utils/strings.h"
+DSN_DECLARE_bool(enable_direct_io);
+
namespace dsn {
class task_tracker;
} // namespace dsn
@@ -51,15 +53,13 @@ namespace block_service {
DEFINE_TASK_CODE(LPC_LOCAL_SERVICE_CALL, TASK_PRIORITY_COMMON,
THREAD_POOL_BLOCK_SERVICE)
-bool file_metadata_from_json(std::ifstream &fin, file_metadata &fmeta) noexcept
+bool file_metadata_from_json(const std::string &data, file_metadata &fmeta)
noexcept
{
- std::string data;
- fin >> data;
try {
nlohmann::json::parse(data).get_to(fmeta);
return true;
} catch (nlohmann::json::exception &exp) {
- LOG_WARNING("decode meta data from json failed: {} [{}]", exp.what(),
data);
+ LOG_WARNING("decode metadata from json failed: {} [{}]", exp.what(),
data);
return false;
}
}
@@ -191,7 +191,7 @@ dsn::task_ptr local_service::create_file(const
create_file_request &req,
if (utils::filesystem::file_exists(file_path) &&
utils::filesystem::file_exists(meta_file_path)) {
- LOG_DEBUG("file({}) already exist", file_path);
+ LOG_INFO("file({}) already exist", file_path);
resp.err = f->load_metadata();
}
@@ -268,17 +268,17 @@ error_code local_file_object::load_metadata()
return ERR_OK;
std::string metadata_path = local_service::get_metafile(file_name());
- std::ifstream is(metadata_path, std::ios::in);
- if (!is.is_open()) {
- LOG_WARNING(
- "load meta data from {} failed, err = {}", metadata_path,
utils::safe_strerror(errno));
+ std::string data;
+ auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(), metadata_path,
&data);
+ if (!s.ok()) {
+ LOG_WARNING("read file '{}' failed, err = {}", metadata_path,
s.ToString());
return ERR_FS_INTERNAL;
}
- auto cleanup = dsn::defer([&is]() { is.close(); });
file_metadata meta;
- bool ans = file_metadata_from_json(is, meta);
+ bool ans = file_metadata_from_json(data, meta);
if (!ans) {
+ LOG_WARNING("decode metadata '{}' file content failed", metadata_path);
return ERR_FS_INTERNAL;
}
_size = meta.size;
@@ -292,16 +292,16 @@ error_code local_file_object::store_metadata()
file_metadata meta;
meta.md5 = _md5_value;
meta.size = _size;
-
+ std::string data = nlohmann::json(meta).dump();
std::string metadata_path = local_service::get_metafile(file_name());
- std::ofstream os(metadata_path, std::ios::out | std::ios::trunc);
- if (!os.is_open()) {
- LOG_WARNING(
- "store to metadata file {} failed, err={}", metadata_path,
utils::safe_strerror(errno));
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(data),
+ metadata_path,
+ /* should_sync */ true);
+ if (!s.ok()) {
+ LOG_WARNING("store to metadata file {} failed, err={}", metadata_path,
s.ToString());
return ERR_FS_INTERNAL;
}
- auto cleanup = dsn::defer([&os]() { os.close(); });
- os << nlohmann::json(meta);
return ERR_OK;
}
@@ -337,24 +337,36 @@ dsn::task_ptr local_file_object::write(const
write_request &req,
}
if (resp.err == ERR_OK) {
- LOG_DEBUG("start write file, file = {}", file_name());
+ LOG_INFO("start write file, file = {}", file_name());
+
+ do {
+ auto s = rocksdb::WriteStringToFile(
+ rocksdb::Env::Default(),
+ rocksdb::Slice(req.buffer.data(), req.buffer.length()),
+ file_name(),
+ /* should_sync */ true);
+ if (!s.ok()) {
+ LOG_WARNING("write file '{}' failed, err = {}",
file_name(), s.ToString());
+ resp.err = ERR_FS_INTERNAL;
+ break;
+ }
- std::ofstream fout(file_name(), std::ifstream::out |
std::ifstream::trunc);
- if (!fout.is_open()) {
- resp.err = ERR_FS_INTERNAL;
- } else {
- fout.write(req.buffer.data(), req.buffer.length());
resp.written_size = req.buffer.length();
- fout.close();
// Currently we calc the meta data from source data, which
save the io bandwidth
// a lot, but it is somewhat not correct.
_size = resp.written_size;
_md5_value = utils::string_md5(req.buffer.data(),
req.buffer.length());
+ // TODO(yingchun): make store_metadata as a local function, do
not depend on the
+ // member variables (i.e. _size and _md5_value).
+ auto err = store_metadata();
+ if (err != ERR_OK) {
+ LOG_WARNING("store_metadata failed");
+ resp.err = ERR_FS_INTERNAL;
+ break;
+ }
_has_meta_synced = true;
-
- store_metadata();
- }
+ } while (false);
}
tsk->enqueue_with(resp);
release_ref();
@@ -375,38 +387,65 @@ dsn::task_ptr local_file_object::read(const read_request
&req,
auto read_func = [this, req, tsk]() {
read_response resp;
- resp.err = ERR_OK;
- if (!utils::filesystem::file_exists(file_name()) ||
-
!utils::filesystem::file_exists(local_service::get_metafile(file_name()))) {
- resp.err = ERR_OBJECT_NOT_FOUND;
- } else {
- if ((resp.err = load_metadata()) != ERR_OK) {
- LOG_WARNING("load meta data of {} failed", file_name());
+ do {
+ if (!utils::filesystem::file_exists(file_name()) ||
+
!utils::filesystem::file_exists(local_service::get_metafile(file_name()))) {
+ LOG_WARNING("data file '{}' or metadata file '{}' not exist",
+ file_name(),
+ local_service::get_metafile(file_name()));
+ resp.err = ERR_OBJECT_NOT_FOUND;
+ break;
+ }
+
+ resp.err = load_metadata();
+ if (resp.err != ERR_OK) {
+ LOG_WARNING("load metadata of {} failed", file_name());
+ break;
+ }
+
+ int64_t file_sz = _size;
+ int64_t total_sz = 0;
+ if (req.remote_length == -1 || req.remote_length + req.remote_pos
> file_sz) {
+ total_sz = file_sz - req.remote_pos;
} else {
- int64_t file_sz = _size;
- int64_t total_sz = 0;
- if (req.remote_length == -1 || req.remote_length +
req.remote_pos > file_sz) {
- total_sz = file_sz - req.remote_pos;
- } else {
- total_sz = req.remote_length;
- }
+ total_sz = req.remote_length;
+ }
- LOG_DEBUG("read file({}), size = {}", file_name(), total_sz);
- std::string buf;
- buf.resize(total_sz + 1);
- std::ifstream fin(file_name(), std::ifstream::in);
- if (!fin.is_open()) {
- resp.err = ERR_FS_INTERNAL;
- } else {
- fin.seekg(static_cast<int64_t>(req.remote_pos), fin.beg);
- fin.read((char *)buf.c_str(), total_sz);
- buf[fin.gcount()] = '\0';
- resp.buffer = blob::create_from_bytes(std::move(buf));
- }
- fin.close();
+ LOG_INFO("start to read file '{}', offset = {}, size = {}",
+ file_name(),
+ req.remote_pos,
+ total_sz);
+ rocksdb::EnvOptions env_options;
+ env_options.use_direct_reads = FLAGS_enable_direct_io;
+ std::unique_ptr<rocksdb::SequentialFile> sfile;
+ auto s = rocksdb::Env::Default()->NewSequentialFile(file_name(),
&sfile, env_options);
+ if (!s.ok()) {
+ LOG_WARNING("open file '{}' failed, err = {}", file_name(),
s.ToString());
+ resp.err = ERR_FS_INTERNAL;
+ break;
}
- }
+ s = sfile->Skip(req.remote_pos);
+ if (!s.ok()) {
+ LOG_WARNING(
+ "skip '{}' for {} failed, err = {}", file_name(),
req.remote_pos, s.ToString());
+ resp.err = ERR_FS_INTERNAL;
+ break;
+ }
+
+ rocksdb::Slice result;
+ std::string buf;
+ buf.resize(total_sz + 1);
+ s = sfile->Read(total_sz, &result, buf.data());
+ if (!s.ok()) {
+ LOG_WARNING("read file '{}' failed, err = {}", file_name(),
s.ToString());
+ resp.err = ERR_FS_INTERNAL;
+ break;
+ }
+
+ buf[result.size()] = 0;
+ resp.buffer = blob::create_from_bytes(std::move(buf));
+ } while (false);
tsk->enqueue_with(resp);
release_ref();
};
@@ -424,58 +463,50 @@ dsn::task_ptr local_file_object::upload(const
upload_request &req,
upload_future_ptr tsk(new upload_future(code, cb, 0));
tsk->set_tracker(tracker);
auto upload_file_func = [this, req, tsk]() {
- upload_response resp;
- resp.err = ERR_OK;
- std::ifstream fin(req.input_local_name, std::ios_base::in);
- if (!fin.is_open()) {
- LOG_WARNING("open source file {} for read failed, err({})",
- req.input_local_name,
- utils::safe_strerror(errno));
- resp.err = ERR_FILE_OPERATION_FAILED;
- }
+ LOG_INFO("start to upload from '{}' to '{}'", req.input_local_name,
file_name());
- utils::filesystem::create_file(file_name());
- std::ofstream fout(file_name(), std::ios_base::out |
std::ios_base::trunc);
- if (!fout.is_open()) {
- LOG_WARNING("open target file {} for write failed, err({})",
- file_name(),
- utils::safe_strerror(errno));
- resp.err = ERR_FS_INTERNAL;
- }
-
- if (resp.err == ERR_OK) {
- LOG_DEBUG("start to transfer from src_file({}) to dst_file({})",
- req.input_local_name,
- file_name());
- int64_t total_sz = 0;
- char buf[max_length] = {'\0'};
- while (!fin.eof()) {
- fin.read(buf, max_length);
- total_sz += fin.gcount();
- fout.write(buf, fin.gcount());
+ upload_response resp;
+ do {
+ // Create the directory.
+ std::string path =
dsn::utils::filesystem::remove_file_name(file_name());
+ if (!dsn::utils::filesystem::create_directory(path)) {
+ LOG_WARNING("create directory '{}' failed", path);
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
}
- LOG_DEBUG("finish upload file, file = {}, total_size = {}",
file_name(), total_sz);
- fout.close();
- fin.close();
-
- resp.uploaded_size = static_cast<uint64_t>(total_sz);
- // calc the md5sum by source file for simplicity
- _size = total_sz;
- error_code res = utils::filesystem::md5sum(req.input_local_name,
_md5_value);
- if (res == dsn::ERR_OK) {
- _has_meta_synced = true;
- store_metadata();
- } else {
+ uint64_t file_size;
+ auto s = dsn::utils::copy_file(req.input_local_name, file_name(),
&file_size);
+ if (!s.ok()) {
+ LOG_WARNING("upload from '{}' to '{}' failed, err = {}",
+ req.input_local_name,
+ file_name(),
+ s.ToString());
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
+ }
+ LOG_INFO("finish to upload from '{}' to '{}', size = {}",
+ req.input_local_name,
+ file_name(),
+ file_size);
+
+ resp.uploaded_size = file_size;
+ _size = file_size;
+ auto res = utils::filesystem::md5sum(file_name(), _md5_value);
+ if (res != dsn::ERR_OK) {
+ LOG_WARNING("calculate md5sum for '{}' failed", file_name());
resp.err = ERR_FS_INTERNAL;
+ break;
}
- } else {
- if (fin.is_open())
- fin.close();
- if (fout.is_open())
- fout.close();
- }
+ auto err = store_metadata();
+ if (err != ERR_OK) {
+ LOG_ERROR("store_metadata failed");
+ resp.err = ERR_FS_INTERNAL;
+ break;
+ }
+ _has_meta_synced = true;
+ } while (false);
tsk->enqueue_with(resp);
release_ref();
};
@@ -497,65 +528,61 @@ dsn::task_ptr local_file_object::download(const
download_request &req,
download_response resp;
resp.err = ERR_OK;
std::string target_file = req.output_local_name;
- if (target_file.empty()) {
- LOG_ERROR(
- "download {} failed, because target name({}) is invalid",
file_name(), target_file);
- resp.err = ERR_INVALID_PARAMETERS;
- }
- if (resp.err == ERR_OK && !_has_meta_synced) {
- if (!utils::filesystem::file_exists(file_name()) ||
-
!utils::filesystem::file_exists(local_service::get_metafile(file_name()))) {
- resp.err = ERR_OBJECT_NOT_FOUND;
+ do {
+ if (target_file.empty()) {
+ LOG_WARNING("download {} failed, because target name({}) is
invalid",
+ file_name(),
+ target_file);
+ resp.err = ERR_INVALID_PARAMETERS;
+ break;
}
- }
- if (resp.err == ERR_OK) {
- std::ifstream fin(file_name(), std::ifstream::in);
- if (!fin.is_open()) {
- LOG_ERROR("open block file({}) failed, err({})",
- file_name(),
- utils::safe_strerror(errno));
- resp.err = ERR_FS_INTERNAL;
+ if (!_has_meta_synced) {
+ if (!utils::filesystem::file_exists(file_name()) ||
+
!utils::filesystem::file_exists(local_service::get_metafile(file_name()))) {
+ LOG_WARNING("file '{}' or metadata file '{}' not found",
+ file_name(),
+ local_service::get_metafile(file_name()));
+ resp.err = ERR_OBJECT_NOT_FOUND;
+ break;
+ }
}
- std::ofstream fout(target_file, std::ios_base::out |
std::ios_base::trunc);
- if (!fout.is_open()) {
- if (fin.is_open())
- fin.close();
- LOG_ERROR("open target file({}) failed, err({})",
- target_file,
- utils::safe_strerror(errno));
+ LOG_INFO("start to download from '{}' to '{}'", file_name(),
target_file);
+
+ // Create the directory.
+ std::string path =
dsn::utils::filesystem::remove_file_name(file_name());
+ if (!dsn::utils::filesystem::create_directory(path)) {
+ LOG_WARNING("create directory '{}' failed", path);
resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
}
- if (resp.err == ERR_OK) {
- LOG_DEBUG(
- "start to transfer, src_file({}), dst_file({})",
file_name(), target_file);
- int64_t total_sz = 0;
- char buf[max_length] = {'\0'};
- while (!fin.eof()) {
- fin.read(buf, max_length);
- total_sz += fin.gcount();
- fout.write(buf, fin.gcount());
- }
- LOG_DEBUG("finish download file({}), total_size = {}",
target_file, total_sz);
- fout.close();
- fin.close();
- resp.downloaded_size = static_cast<uint64_t>(total_sz);
-
- _size = total_sz;
- if ((resp.err = utils::filesystem::md5sum(target_file,
_md5_value)) != ERR_OK) {
- LOG_WARNING("download {} failed when calculate the md5sum
of {}",
- file_name(),
- target_file);
- } else {
- _has_meta_synced = true;
- resp.file_md5 = _md5_value;
- }
+ uint64_t file_size;
+ auto s = dsn::utils::copy_file(file_name(), target_file,
&file_size);
+ if (!s.ok()) {
+ LOG_WARNING("download from '{}' to '{}' failed, err = {}",
+ file_name(),
+ target_file,
+ s.ToString());
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
+ }
+
+ auto res = utils::filesystem::md5sum(target_file, _md5_value);
+ if (res != dsn::ERR_OK) {
+ LOG_WARNING("calculate md5sum for {} failed", target_file);
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
}
- }
+ LOG_INFO("finish download file '{}', size = {}", target_file,
file_size);
+ resp.downloaded_size = file_size;
+ resp.file_md5 = _md5_value;
+ _size = file_size;
+ _has_meta_synced = true;
+ } while (false);
tsk->enqueue_with(resp);
release_ref();
};
diff --git a/src/block_service/test/local_service_test.cpp
b/src/block_service/test/local_service_test.cpp
index 72a201237..e4e6aeb2e 100644
--- a/src/block_service/test/local_service_test.cpp
+++ b/src/block_service/test/local_service_test.cpp
@@ -19,17 +19,23 @@
#include <boost/filesystem/operations.hpp>
// IWYU pragma: no_include <gtest/gtest-message.h>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
#include <nlohmann/detail/json_ref.hpp>
#include <nlohmann/json.hpp>
#include <nlohmann/json_fwd.hpp>
-#include <fstream>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
#include <initializer_list>
+#include <map>
#include <stdexcept>
+#include <string>
#include <vector>
#include "block_service/local/local_service.h"
+#include "test_util/test_util.h"
#include "utils/error_code.h"
namespace dsn {
@@ -37,53 +43,69 @@ namespace dist {
namespace block_service {
// Simple tests for nlohmann::json serialization, via
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE.
+class local_service_test : public pegasus::encrypt_data_test_base
+{
+};
+
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, local_service_test, ::testing::Values(false));
-TEST(local_service, store_metadata)
+TEST_P(local_service_test, store_metadata)
{
local_file_object file("a.txt");
error_code ec = file.store_metadata();
- ASSERT_EQ(ec, ERR_OK);
+ ASSERT_EQ(ERR_OK, ec);
auto meta_file_path = local_service::get_metafile(file.file_name());
ASSERT_TRUE(boost::filesystem::exists(meta_file_path));
- std::ifstream ifs(meta_file_path);
- nlohmann::json j;
- ifs >> j;
- ASSERT_EQ(j["md5"], "");
- ASSERT_EQ(j["size"], 0);
+ std::string data;
+ auto s = rocksdb::ReadFileToString(rocksdb::Env::Default(),
meta_file_path, &data);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ nlohmann::json j = nlohmann::json::parse(data);
+ ASSERT_EQ("", j["md5"]);
+ ASSERT_EQ(0, j["size"]);
}
-TEST(local_service, load_metadata)
+TEST_P(local_service_test, load_metadata)
{
local_file_object file("a.txt");
auto meta_file_path = local_service::get_metafile(file.file_name());
{
- std::ofstream ofs(meta_file_path);
nlohmann::json j({{"md5", "abcde"}, {"size", 5}});
- ofs << j;
- ofs.close();
+ std::string data = j.dump();
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(data),
+ meta_file_path,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
- ASSERT_EQ(file.load_metadata(), ERR_OK);
- ASSERT_EQ(file.get_md5sum(), "abcde");
- ASSERT_EQ(file.get_size(), 5);
+ ASSERT_EQ(ERR_OK, file.load_metadata());
+ ASSERT_EQ("abcde", file.get_md5sum());
+ ASSERT_EQ(5, file.get_size());
}
{
- std::ofstream ofs(meta_file_path);
- ofs << "invalid json string";
- ofs.close();
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice("invalid json
string"),
+ meta_file_path,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
local_file_object file2("a.txt");
ASSERT_EQ(file2.load_metadata(), ERR_FS_INTERNAL);
}
{
- std::ofstream ofs(meta_file_path);
nlohmann::json j({{"md5", "abcde"}, {"no such key", "illegal"}});
- ofs << j;
- ofs.close();
+ std::string data = j.dump();
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(data),
+ meta_file_path,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
local_file_object file2("a.txt");
ASSERT_EQ(file2.load_metadata(), ERR_FS_INTERNAL);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]