This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 58ac76575 refactor(file_system): use rocksdb API to read/write file
(#1624)
58ac76575 is described below
commit 58ac765758bc616b17568f7ae02988c37224126b
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Sep 27 12:03:47 2023 -0500
refactor(file_system): use rocksdb API to read/write file (#1624)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the files read/write
method
of filesystem module.
---
src/replica/bulk_load/replica_bulk_loader.cpp | 7 +-
src/replica/replica_restore.cpp | 4 +-
src/server/pegasus_write_service_impl.h | 4 +-
src/utils/filesystem.cpp | 154 ++++++++++++++------------
src/utils/filesystem.h | 13 ++-
src/utils/test/file_system_test.cpp | 55 ++++++++-
src/utils/test/file_utils.cpp | 9 +-
7 files changed, 156 insertions(+), 90 deletions(-)
diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 60f3e5a04..5261e4183 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -44,6 +44,7 @@
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/chrono_literals.h"
+#include "utils/env.h"
#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
@@ -497,7 +498,8 @@ void replica_bulk_loader::download_sst_file(const
std::string &remote_dir,
// We are not sure if the file was cached by system. And we couldn't
// afford the io overhead which is cased by reading file in
verify_file(),
// so if file exist we just verify file size
- if (utils::filesystem::verify_file_size(file_name, f_meta.size)) {
+ if (utils::filesystem::verify_file_size(
+ file_name, utils::FileDataType::kSensitive, f_meta.size)) {
// local file exist and is verified
ec = ERR_OK;
f_size = f_meta.size;
@@ -520,7 +522,8 @@ void replica_bulk_loader::download_sst_file(const
std::string &remote_dir,
if (ec == ERR_OK && !verified) {
if (!f_meta.md5.empty() && f_md5 != f_meta.md5) {
ec = ERR_CORRUPTION;
- } else if (!utils::filesystem::verify_file_size(file_name,
f_meta.size)) {
+ } else if (!utils::filesystem::verify_file_size(
+ file_name, utils::FileDataType::kSensitive,
f_meta.size)) {
ec = ERR_CORRUPTION;
}
}
diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 7236f31a1..f2494482f 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -51,6 +51,7 @@
#include "runtime/task/task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
+#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
@@ -145,7 +146,8 @@ error_code replica::download_checkpoint(const
configuration_restore_request &req
const std::string file_name =
utils::filesystem::path_combine(local_chkpt_dir,
f_meta.name);
if (download_err == ERR_OK || download_err ==
ERR_PATH_ALREADY_EXIST) {
- if (!utils::filesystem::verify_file(file_name, f_meta.md5,
f_meta.size)) {
+ if (!utils::filesystem::verify_file(
+ file_name, utils::FileDataType::kSensitive,
f_meta.md5, f_meta.size)) {
download_err = ERR_CORRUPTION;
} else if (download_err == ERR_PATH_ALREADY_EXIST) {
download_err = ERR_OK;
diff --git a/src/server/pegasus_write_service_impl.h
b/src/server/pegasus_write_service_impl.h
index ff9f2fe83..69b71c6ba 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -29,6 +29,7 @@
#include "pegasus_write_service.h"
#include "rocksdb_wrapper.h"
#include "utils/defer.h"
+#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
@@ -76,7 +77,8 @@ inline dsn::error_code get_external_files_path(const
std::string &bulk_load_dir,
for (const auto &f_meta : metadata.files) {
const auto &file_name =
dsn::utils::filesystem::path_combine(bulk_load_dir, f_meta.name);
if (verify_before_ingest &&
- !dsn::utils::filesystem::verify_file(file_name, f_meta.md5,
f_meta.size)) {
+ !dsn::utils::filesystem::verify_file(
+ file_name, dsn::utils::FileDataType::kSensitive, f_meta.md5,
f_meta.size)) {
break;
}
files_path.emplace_back(file_name);
diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp
index f2fa59ef8..2f80a51d5 100644
--- a/src/utils/filesystem.cpp
+++ b/src/utils/filesystem.cpp
@@ -33,24 +33,23 @@
* xxxx-xx-xx, author, fix bug about xxx
*/
+#include <boost/filesystem/operations.hpp>
+#include <boost/system/error_code.hpp>
#include <errno.h>
-#include <fcntl.h>
+#include <fmt/core.h>
#include <ftw.h>
#include <limits.h>
#include <openssl/md5.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
// IWYU pragma: no_include <bits/struct_stat.h>
#include <unistd.h>
-
#include <istream>
-
-#include <boost/filesystem/operations.hpp>
-#include <boost/system/error_code.hpp>
-#include <fmt/core.h>
-#include <rocksdb/env.h>
-#include <rocksdb/status.h>
+#include <memory>
#include "utils/defer.h"
#include "utils/env.h"
@@ -60,7 +59,6 @@
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
#include "utils/string_view.h"
-#include "utils/strings.h"
#define getcwd_ getcwd
#define rmdir_ rmdir
@@ -417,7 +415,6 @@ bool deprecated_file_size(const std::string &path, int64_t
&sz)
}
sz = st.st_size;
-
return true;
}
@@ -517,53 +514,28 @@ out_error:
bool create_file(const std::string &path)
{
- size_t pos;
std::string npath;
- int fd;
- int mode;
- int err;
-
- if (path.empty()) {
- return false;
- }
-
- if (_FS_ISSEP(path.back())) {
- return false;
- }
-
- err = get_normalized_path(path, npath);
+ int err = get_normalized_path(path, npath);
if (err != 0) {
return false;
}
- if (dsn::utils::filesystem::path_exists_internal(npath, FTW_F)) {
- return true;
- }
-
- if (dsn::utils::filesystem::path_exists_internal(npath, FTW_D)) {
- return false;
- }
-
- pos = npath.find_last_of("\\/");
+ auto pos = npath.find_last_of("\\/");
if ((pos != std::string::npos) && (pos > 0)) {
auto ppath = npath.substr(0, pos);
if (!dsn::utils::filesystem::create_directory(ppath)) {
+ LOG_WARNING("fail to create directory {}", ppath);
return false;
}
}
- mode = 0775;
- fd = ::creat(npath.c_str(), mode);
- if (fd == -1) {
- err = errno;
- LOG_WARNING("create_file {} failed, err = {}", path,
safe_strerror(err));
+ std::unique_ptr<rocksdb::WritableFile> wfile;
+ auto s = rocksdb::Env::Default()->ReopenWritableFile(path, &wfile,
rocksdb::EnvOptions());
+ if (dsn_unlikely(!s.ok())) {
+ LOG_WARNING("fail to create file {}, err={}", path, s.ToString());
return false;
}
- if (::close_(fd) != 0) {
- LOG_WARNING("create_file {}, failed to close the file handle.", path);
- }
-
return true;
}
@@ -745,6 +717,54 @@ bool link_file(const std::string &src, const std::string
&target)
}
error_code md5sum(const std::string &file_path, /*out*/ std::string &result)
+{
+ result.clear();
+ if (!::dsn::utils::filesystem::file_exists(file_path)) {
+ LOG_ERROR("md5sum error: file {} not exist", file_path);
+ return ERR_OBJECT_NOT_FOUND;
+ }
+
+ std::unique_ptr<rocksdb::SequentialFile> sfile;
+ auto s = rocksdb::Env::Default()->NewSequentialFile(file_path, &sfile,
rocksdb::EnvOptions());
+ if (!sfile) {
+ LOG_ERROR("md5sum error: open file {} failed, err={}", file_path,
s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
+ }
+
+ const int64_t kBufferSize = 4096;
+ char buf[kBufferSize];
+ unsigned char out[MD5_DIGEST_LENGTH] = {0};
+ MD5_CTX c;
+ CHECK_EQ(1, MD5_Init(&c));
+ while (true) {
+ rocksdb::Slice res;
+ s = sfile->Read(kBufferSize, &res, buf);
+ if (!s.ok()) {
+ MD5_Final(out, &c);
+ LOG_ERROR("md5sum error: read file {} failed, err={}", file_path,
s.ToString());
+ return ERR_FILE_OPERATION_FAILED;
+ }
+ if (res.empty()) {
+ break;
+ }
+ CHECK_EQ(1, MD5_Update(&c, buf, res.size()));
+ if (res.size() < kBufferSize) {
+ break;
+ }
+ }
+ CHECK_EQ(1, MD5_Final(out, &c));
+
+ char str[MD5_DIGEST_LENGTH * 2 + 1];
+ str[MD5_DIGEST_LENGTH * 2] = 0;
+ for (int n = 0; n < MD5_DIGEST_LENGTH; n++) {
+ sprintf(str + n + n, "%02x", out[n]);
+ }
+ result.assign(str);
+
+ return ERR_OK;
+}
+
+error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string
&result)
{
result.clear();
// if file not exist, we return ERR_OBJECT_NOT_FOUND
@@ -841,6 +861,7 @@ error_code read_file(const std::string &fname, std::string
&buf)
}
bool verify_file(const std::string &fname,
+ FileDataType type,
const std::string &expected_md5,
const int64_t &expected_fsize)
{
@@ -849,7 +870,7 @@ bool verify_file(const std::string &fname,
return false;
}
int64_t f_size = 0;
- if (!file_size(fname, f_size)) {
+ if (!file_size(fname, type, f_size)) {
LOG_ERROR("verify file({}) failed, becaused failed to get file size",
fname);
return false;
}
@@ -870,14 +891,14 @@ bool verify_file(const std::string &fname,
return true;
}
-bool verify_file_size(const std::string &fname, const int64_t &expected_fsize)
+bool verify_file_size(const std::string &fname, FileDataType type, const
int64_t &expected_fsize)
{
if (!file_exists(fname)) {
LOG_ERROR("file({}) is not existed", fname);
return false;
}
int64_t f_size = 0;
- if (!file_size(fname, f_size)) {
+ if (!file_size(fname, type, f_size)) {
LOG_ERROR("verify file({}) size failed, becaused failed to get file
size", fname);
return false;
}
@@ -891,22 +912,6 @@ bool verify_file_size(const std::string &fname, const
int64_t &expected_fsize)
return true;
}
-bool verify_data_md5(const std::string &fname,
- const char *data,
- const size_t data_size,
- const std::string &expected_md5)
-{
- std::string md5 = string_md5(data, data_size);
- if (md5 != expected_md5) {
- LOG_ERROR("verify data({}) failed, because data damaged, size: md5: {}
VS {}",
- fname,
- md5,
- expected_md5);
- return false;
- }
- return true;
-}
-
bool create_directory(const std::string &path, std::string &absolute_path,
std::string &err_msg)
{
FAIL_POINT_INJECT_F("filesystem_create_directory", [path](string_view str)
{
@@ -952,23 +957,28 @@ bool check_dir_rw(const std::string &path, std::string
&err_msg)
path.find(broken_disk_dir) == std::string::npos;
});
- std::string fname = "read_write_test_file";
- std::string fpath = path_combine(path, fname);
- if (!create_file(fpath)) {
- err_msg = fmt::format("Fail to create test file {}.", fpath);
+ static const std::string kTestValue = "test_value";
+ static const std::string kFname = "read_write_test_file";
+ std::string fpath = path_combine(path, kFname);
+ auto cleanup = defer([&fpath]() { remove_path(fpath); });
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+ rocksdb::Slice(kTestValue),
+ fpath,
+ /* should_sync */ true);
+ if (dsn_unlikely(!s.ok())) {
+ err_msg = fmt::format("fail to write file {}, err={}", fpath,
s.ToString());
return false;
}
- auto cleanup = defer([&fpath]() { remove_path(fpath); });
- std::string value = "test_value";
- if (!write_file(fpath, value)) {
- err_msg = fmt::format("Fail to write file {}.", fpath);
+ std::string read_data;
+ s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fpath, &read_data);
+ if (dsn_unlikely(!s.ok())) {
+ err_msg = fmt::format("fail to read file {}, err={}", fpath,
s.ToString());
return false;
}
- std::string buf;
- if (read_file(fpath, buf) != ERR_OK || buf != value) {
- err_msg = fmt::format("Fail to read file {} or get wrong value({}).",
fpath, buf);
+ if (dsn_unlikely(read_data != kTestValue)) {
+ err_msg = fmt::format("get wrong value '{}' from file {}", read_data,
fpath);
return false;
}
diff --git a/src/utils/filesystem.h b/src/utils/filesystem.h
index 9c5d3efea..440128a97 100644
--- a/src/utils/filesystem.h
+++ b/src/utils/filesystem.h
@@ -65,6 +65,8 @@ enum class FileDataType;
namespace filesystem {
+// TODO(yingchun): Consider using rocksdb APIs to rewrite the following
functions.
+
int get_normalized_path(const std::string &path, std::string &npath);
bool get_absolute_path(const std::string &path1, std::string &path2);
@@ -136,6 +138,7 @@ bool get_disk_space_info(const std::string &path,
disk_space_info &info);
bool link_file(const std::string &src, const std::string &target);
error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
+error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string
&result);
// return value:
// - <A, B>:
@@ -143,25 +146,23 @@ error_code md5sum(const std::string &file_path, /*out*/
std::string &result);
// B is represent wheter the directory is empty, true means empty,
otherwise false
std::pair<error_code, bool> is_directory_empty(const std::string &dirname);
+// TODO(yingchun): remove it!
error_code read_file(const std::string &fname, /*out*/ std::string &buf);
// compare file metadata calculated by fname with expected md5 and file_size
bool verify_file(const std::string &fname,
+ FileDataType type,
const std::string &expected_md5,
const int64_t &expected_fsize);
-bool verify_file_size(const std::string &fname, const int64_t &expected_fsize);
-
-bool verify_data_md5(const std::string &fname,
- const char *data,
- const size_t data_size,
- const std::string &expected_md5);
+bool verify_file_size(const std::string &fname, FileDataType type, const
int64_t &expected_fsize);
// create driectory and get absolute path
bool create_directory(const std::string &path,
/*out*/ std::string &absolute_path,
/*out*/ std::string &err_msg);
+// TODO(yingchun): remove it!
bool write_file(const std::string &fname, std::string &buf);
// check if directory is readable and writable
diff --git a/src/utils/test/file_system_test.cpp
b/src/utils/test/file_system_test.cpp
index ccfc2da2f..f497a908b 100644
--- a/src/utils/test/file_system_test.cpp
+++ b/src/utils/test/file_system_test.cpp
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
@@ -26,6 +27,7 @@
#include <string>
#include "utils/env.h"
+#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
@@ -115,18 +117,63 @@ TEST(filesystem_test_p, encrypted_file_size)
ASSERT_EQ(kFileContentSize + kEncryptionHeaderkSize, actual_file_size);
}
+// The old filesystem API doesn't support sensitive files, so skip testing
+// FLAGS_encrypt_data_at_rest=true.
+TEST(filesystem_test, check_new_md5sum)
+{
+ FLAGS_encrypt_data_at_rest = false;
+
+ struct file_info
+ {
+ int64_t size;
+ } tests[]{{4095}, {4096}, {4097}};
+
+ for (const auto &test : tests) {
+ std::string fname = "test_file";
+ // deprecated_md5sum doesn't support kSensitive files, so use
kNonSensitive here.
+ auto s = rocksdb::WriteStringToFile(
+ dsn::utils::PegasusEnv(dsn::utils::FileDataType::kNonSensitive),
+ rocksdb::Slice(std::string(test.size, 'a')),
+ fname,
+ /* should_sync */ true);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ // Check the file size.
+ int64_t file_fsize;
+ ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, file_fsize));
+ ASSERT_EQ(test.size, file_fsize);
+
+ // Get the md5sum.
+ std::string md5sum1;
+ ASSERT_EQ(ERR_OK, md5sum(fname, md5sum1));
+ ASSERT_FALSE(md5sum1.empty());
+
+ // Check the md5sum is repeatable.
+ std::string md5sum2;
+ ASSERT_EQ(ERR_OK, md5sum(fname, md5sum2));
+ ASSERT_EQ(md5sum1, md5sum2);
+
+ // Check the md5sum is the same to deprecated_md5sum.
+ ASSERT_EQ(ERR_OK, deprecated_md5sum(fname, md5sum2));
+ ASSERT_EQ(md5sum1, md5sum2);
+
+ utils::filesystem::remove_path(fname);
+ }
+}
+
TEST(filesystem_test, verify_file_test)
{
+ FLAGS_encrypt_data_at_rest = false;
+
const std::string &fname = "test_file";
std::string expected_md5;
int64_t expected_fsize;
create_file(fname);
md5sum(fname, expected_md5);
- file_size(fname, expected_fsize);
+ ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, expected_fsize));
- ASSERT_TRUE(verify_file(fname, expected_md5, expected_fsize));
- ASSERT_FALSE(verify_file(fname, "wrong_md5", 10086));
- ASSERT_FALSE(verify_file("file_not_exists", "wrong_md5", 10086));
+ ASSERT_TRUE(verify_file(fname, FileDataType::kNonSensitive, expected_md5,
expected_fsize));
+ ASSERT_FALSE(verify_file(fname, FileDataType::kNonSensitive, "wrong_md5",
10086));
+ ASSERT_FALSE(verify_file("file_not_exists", FileDataType::kNonSensitive,
"wrong_md5", 10086));
remove_path(fname);
}
diff --git a/src/utils/test/file_utils.cpp b/src/utils/test/file_utils.cpp
index 004e228b4..0cc2173e8 100644
--- a/src/utils/test/file_utils.cpp
+++ b/src/utils/test/file_utils.cpp
@@ -33,6 +33,7 @@
#include <string>
#include <vector>
+#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
@@ -683,12 +684,12 @@ static void file_utils_test_file_size()
bool ret;
path = "./file_utils_temp.txt";
- ret = dsn::utils::filesystem::file_size(path, sz);
- EXPECT_TRUE(ret);
- EXPECT_TRUE(sz == 12);
+ ret = dsn::utils::filesystem::file_size(path,
dsn::utils::FileDataType::kNonSensitive, sz);
+ ASSERT_TRUE(ret);
+ ASSERT_EQ(12, sz);
path = "./file_utils_temp2.txt";
- ret = dsn::utils::filesystem::file_size(path, sz);
+ ret = dsn::utils::filesystem::file_size(path,
dsn::utils::FileDataType::kNonSensitive, sz);
EXPECT_FALSE(ret);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]