This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 58ac76575 refactor(file_system): use rocksdb API to read/write file 
(#1624)
58ac76575 is described below

commit 58ac765758bc616b17568f7ae02988c37224126b
Author: Yingchun Lai <[email protected]>
AuthorDate: Wed Sep 27 12:03:47 2023 -0500

    refactor(file_system): use rocksdb API to read/write file (#1624)
    
    https://github.com/apache/incubator-pegasus/issues/887
    
    There is no functional changes, but only refactor the files read/write 
method
    of filesystem module.
---
 src/replica/bulk_load/replica_bulk_loader.cpp |   7 +-
 src/replica/replica_restore.cpp               |   4 +-
 src/server/pegasus_write_service_impl.h       |   4 +-
 src/utils/filesystem.cpp                      | 154 ++++++++++++++------------
 src/utils/filesystem.h                        |  13 ++-
 src/utils/test/file_system_test.cpp           |  55 ++++++++-
 src/utils/test/file_utils.cpp                 |   9 +-
 7 files changed, 156 insertions(+), 90 deletions(-)

diff --git a/src/replica/bulk_load/replica_bulk_loader.cpp 
b/src/replica/bulk_load/replica_bulk_loader.cpp
index 60f3e5a04..5261e4183 100644
--- a/src/replica/bulk_load/replica_bulk_loader.cpp
+++ b/src/replica/bulk_load/replica_bulk_loader.cpp
@@ -44,6 +44,7 @@
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/chrono_literals.h"
+#include "utils/env.h"
 #include "utils/fail_point.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
@@ -497,7 +498,8 @@ void replica_bulk_loader::download_sst_file(const 
std::string &remote_dir,
         // We are not sure if the file was cached by system. And we couldn't
         // afford the io overhead which is cased by reading file in 
verify_file(),
         // so if file exist we just verify file size
-        if (utils::filesystem::verify_file_size(file_name, f_meta.size)) {
+        if (utils::filesystem::verify_file_size(
+                file_name, utils::FileDataType::kSensitive, f_meta.size)) {
             // local file exist and is verified
             ec = ERR_OK;
             f_size = f_meta.size;
@@ -520,7 +522,8 @@ void replica_bulk_loader::download_sst_file(const 
std::string &remote_dir,
     if (ec == ERR_OK && !verified) {
         if (!f_meta.md5.empty() && f_md5 != f_meta.md5) {
             ec = ERR_CORRUPTION;
-        } else if (!utils::filesystem::verify_file_size(file_name, 
f_meta.size)) {
+        } else if (!utils::filesystem::verify_file_size(
+                       file_name, utils::FileDataType::kSensitive, 
f_meta.size)) {
             ec = ERR_CORRUPTION;
         }
     }
diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp
index 7236f31a1..f2494482f 100644
--- a/src/replica/replica_restore.cpp
+++ b/src/replica/replica_restore.cpp
@@ -51,6 +51,7 @@
 #include "runtime/task/task_tracker.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
+#include "utils/env.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
@@ -145,7 +146,8 @@ error_code replica::download_checkpoint(const 
configuration_restore_request &req
                 const std::string file_name =
                     utils::filesystem::path_combine(local_chkpt_dir, 
f_meta.name);
                 if (download_err == ERR_OK || download_err == 
ERR_PATH_ALREADY_EXIST) {
-                    if (!utils::filesystem::verify_file(file_name, f_meta.md5, 
f_meta.size)) {
+                    if (!utils::filesystem::verify_file(
+                            file_name, utils::FileDataType::kSensitive, 
f_meta.md5, f_meta.size)) {
                         download_err = ERR_CORRUPTION;
                     } else if (download_err == ERR_PATH_ALREADY_EXIST) {
                         download_err = ERR_OK;
diff --git a/src/server/pegasus_write_service_impl.h 
b/src/server/pegasus_write_service_impl.h
index ff9f2fe83..69b71c6ba 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -29,6 +29,7 @@
 #include "pegasus_write_service.h"
 #include "rocksdb_wrapper.h"
 #include "utils/defer.h"
+#include "utils/env.h"
 #include "utils/filesystem.h"
 #include "utils/string_conv.h"
 #include "utils/strings.h"
@@ -76,7 +77,8 @@ inline dsn::error_code get_external_files_path(const 
std::string &bulk_load_dir,
     for (const auto &f_meta : metadata.files) {
         const auto &file_name = 
dsn::utils::filesystem::path_combine(bulk_load_dir, f_meta.name);
         if (verify_before_ingest &&
-            !dsn::utils::filesystem::verify_file(file_name, f_meta.md5, 
f_meta.size)) {
+            !dsn::utils::filesystem::verify_file(
+                file_name, dsn::utils::FileDataType::kSensitive, f_meta.md5, 
f_meta.size)) {
             break;
         }
         files_path.emplace_back(file_name);
diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp
index f2fa59ef8..2f80a51d5 100644
--- a/src/utils/filesystem.cpp
+++ b/src/utils/filesystem.cpp
@@ -33,24 +33,23 @@
  *     xxxx-xx-xx, author, fix bug about xxx
  */
 
+#include <boost/filesystem/operations.hpp>
+#include <boost/system/error_code.hpp>
 #include <errno.h>
-#include <fcntl.h>
+#include <fmt/core.h>
 #include <ftw.h>
 #include <limits.h>
 #include <openssl/md5.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/stat.h>
 // IWYU pragma: no_include <bits/struct_stat.h>
 #include <unistd.h>
-
 #include <istream>
-
-#include <boost/filesystem/operations.hpp>
-#include <boost/system/error_code.hpp>
-#include <fmt/core.h>
-#include <rocksdb/env.h>
-#include <rocksdb/status.h>
+#include <memory>
 
 #include "utils/defer.h"
 #include "utils/env.h"
@@ -60,7 +59,6 @@
 #include "utils/ports.h"
 #include "utils/safe_strerror_posix.h"
 #include "utils/string_view.h"
-#include "utils/strings.h"
 
 #define getcwd_ getcwd
 #define rmdir_ rmdir
@@ -417,7 +415,6 @@ bool deprecated_file_size(const std::string &path, int64_t 
&sz)
     }
 
     sz = st.st_size;
-
     return true;
 }
 
@@ -517,53 +514,28 @@ out_error:
 
 bool create_file(const std::string &path)
 {
-    size_t pos;
     std::string npath;
-    int fd;
-    int mode;
-    int err;
-
-    if (path.empty()) {
-        return false;
-    }
-
-    if (_FS_ISSEP(path.back())) {
-        return false;
-    }
-
-    err = get_normalized_path(path, npath);
+    int err = get_normalized_path(path, npath);
     if (err != 0) {
         return false;
     }
 
-    if (dsn::utils::filesystem::path_exists_internal(npath, FTW_F)) {
-        return true;
-    }
-
-    if (dsn::utils::filesystem::path_exists_internal(npath, FTW_D)) {
-        return false;
-    }
-
-    pos = npath.find_last_of("\\/");
+    auto pos = npath.find_last_of("\\/");
     if ((pos != std::string::npos) && (pos > 0)) {
         auto ppath = npath.substr(0, pos);
         if (!dsn::utils::filesystem::create_directory(ppath)) {
+            LOG_WARNING("fail to create directory {}", ppath);
             return false;
         }
     }
 
-    mode = 0775;
-    fd = ::creat(npath.c_str(), mode);
-    if (fd == -1) {
-        err = errno;
-        LOG_WARNING("create_file {} failed, err = {}", path, 
safe_strerror(err));
+    std::unique_ptr<rocksdb::WritableFile> wfile;
+    auto s = rocksdb::Env::Default()->ReopenWritableFile(path, &wfile, 
rocksdb::EnvOptions());
+    if (dsn_unlikely(!s.ok())) {
+        LOG_WARNING("fail to create file {}, err={}", path, s.ToString());
         return false;
     }
 
-    if (::close_(fd) != 0) {
-        LOG_WARNING("create_file {}, failed to close the file handle.", path);
-    }
-
     return true;
 }
 
@@ -745,6 +717,54 @@ bool link_file(const std::string &src, const std::string 
&target)
 }
 
 error_code md5sum(const std::string &file_path, /*out*/ std::string &result)
+{
+    result.clear();
+    if (!::dsn::utils::filesystem::file_exists(file_path)) {
+        LOG_ERROR("md5sum error: file {} not exist", file_path);
+        return ERR_OBJECT_NOT_FOUND;
+    }
+
+    std::unique_ptr<rocksdb::SequentialFile> sfile;
+    auto s = rocksdb::Env::Default()->NewSequentialFile(file_path, &sfile, 
rocksdb::EnvOptions());
+    if (!sfile) {
+        LOG_ERROR("md5sum error: open file {} failed, err={}", file_path, 
s.ToString());
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    const int64_t kBufferSize = 4096;
+    char buf[kBufferSize];
+    unsigned char out[MD5_DIGEST_LENGTH] = {0};
+    MD5_CTX c;
+    CHECK_EQ(1, MD5_Init(&c));
+    while (true) {
+        rocksdb::Slice res;
+        s = sfile->Read(kBufferSize, &res, buf);
+        if (!s.ok()) {
+            MD5_Final(out, &c);
+            LOG_ERROR("md5sum error: read file {} failed, err={}", file_path, 
s.ToString());
+            return ERR_FILE_OPERATION_FAILED;
+        }
+        if (res.empty()) {
+            break;
+        }
+        CHECK_EQ(1, MD5_Update(&c, buf, res.size()));
+        if (res.size() < kBufferSize) {
+            break;
+        }
+    }
+    CHECK_EQ(1, MD5_Final(out, &c));
+
+    char str[MD5_DIGEST_LENGTH * 2 + 1];
+    str[MD5_DIGEST_LENGTH * 2] = 0;
+    for (int n = 0; n < MD5_DIGEST_LENGTH; n++) {
+        sprintf(str + n + n, "%02x", out[n]);
+    }
+    result.assign(str);
+
+    return ERR_OK;
+}
+
+error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string 
&result)
 {
     result.clear();
     // if file not exist, we return ERR_OBJECT_NOT_FOUND
@@ -841,6 +861,7 @@ error_code read_file(const std::string &fname, std::string 
&buf)
 }
 
 bool verify_file(const std::string &fname,
+                 FileDataType type,
                  const std::string &expected_md5,
                  const int64_t &expected_fsize)
 {
@@ -849,7 +870,7 @@ bool verify_file(const std::string &fname,
         return false;
     }
     int64_t f_size = 0;
-    if (!file_size(fname, f_size)) {
+    if (!file_size(fname, type, f_size)) {
         LOG_ERROR("verify file({}) failed, becaused failed to get file size", 
fname);
         return false;
     }
@@ -870,14 +891,14 @@ bool verify_file(const std::string &fname,
     return true;
 }
 
-bool verify_file_size(const std::string &fname, const int64_t &expected_fsize)
+bool verify_file_size(const std::string &fname, FileDataType type, const 
int64_t &expected_fsize)
 {
     if (!file_exists(fname)) {
         LOG_ERROR("file({}) is not existed", fname);
         return false;
     }
     int64_t f_size = 0;
-    if (!file_size(fname, f_size)) {
+    if (!file_size(fname, type, f_size)) {
         LOG_ERROR("verify file({}) size failed, becaused failed to get file 
size", fname);
         return false;
     }
@@ -891,22 +912,6 @@ bool verify_file_size(const std::string &fname, const 
int64_t &expected_fsize)
     return true;
 }
 
-bool verify_data_md5(const std::string &fname,
-                     const char *data,
-                     const size_t data_size,
-                     const std::string &expected_md5)
-{
-    std::string md5 = string_md5(data, data_size);
-    if (md5 != expected_md5) {
-        LOG_ERROR("verify data({}) failed, because data damaged, size: md5: {} 
VS {}",
-                  fname,
-                  md5,
-                  expected_md5);
-        return false;
-    }
-    return true;
-}
-
 bool create_directory(const std::string &path, std::string &absolute_path, 
std::string &err_msg)
 {
     FAIL_POINT_INJECT_F("filesystem_create_directory", [path](string_view str) 
{
@@ -952,23 +957,28 @@ bool check_dir_rw(const std::string &path, std::string 
&err_msg)
                path.find(broken_disk_dir) == std::string::npos;
     });
 
-    std::string fname = "read_write_test_file";
-    std::string fpath = path_combine(path, fname);
-    if (!create_file(fpath)) {
-        err_msg = fmt::format("Fail to create test file {}.", fpath);
+    static const std::string kTestValue = "test_value";
+    static const std::string kFname = "read_write_test_file";
+    std::string fpath = path_combine(path, kFname);
+    auto cleanup = defer([&fpath]() { remove_path(fpath); });
+    auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                        rocksdb::Slice(kTestValue),
+                                        fpath,
+                                        /* should_sync */ true);
+    if (dsn_unlikely(!s.ok())) {
+        err_msg = fmt::format("fail to write file {}, err={}", fpath, 
s.ToString());
         return false;
     }
 
-    auto cleanup = defer([&fpath]() { remove_path(fpath); });
-    std::string value = "test_value";
-    if (!write_file(fpath, value)) {
-        err_msg = fmt::format("Fail to write file {}.", fpath);
+    std::string read_data;
+    s = rocksdb::ReadFileToString(rocksdb::Env::Default(), fpath, &read_data);
+    if (dsn_unlikely(!s.ok())) {
+        err_msg = fmt::format("fail to read file {}, err={}", fpath, 
s.ToString());
         return false;
     }
 
-    std::string buf;
-    if (read_file(fpath, buf) != ERR_OK || buf != value) {
-        err_msg = fmt::format("Fail to read file {} or get wrong value({}).", 
fpath, buf);
+    if (dsn_unlikely(read_data != kTestValue)) {
+        err_msg = fmt::format("get wrong value '{}' from file {}", read_data, 
fpath);
         return false;
     }
 
diff --git a/src/utils/filesystem.h b/src/utils/filesystem.h
index 9c5d3efea..440128a97 100644
--- a/src/utils/filesystem.h
+++ b/src/utils/filesystem.h
@@ -65,6 +65,8 @@ enum class FileDataType;
 
 namespace filesystem {
 
+// TODO(yingchun): Consider using rocksdb APIs to rewrite the following 
functions.
+
 int get_normalized_path(const std::string &path, std::string &npath);
 
 bool get_absolute_path(const std::string &path1, std::string &path2);
@@ -136,6 +138,7 @@ bool get_disk_space_info(const std::string &path, 
disk_space_info &info);
 bool link_file(const std::string &src, const std::string &target);
 
 error_code md5sum(const std::string &file_path, /*out*/ std::string &result);
+error_code deprecated_md5sum(const std::string &file_path, /*out*/ std::string 
&result);
 
 // return value:
 //  - <A, B>:
@@ -143,25 +146,23 @@ error_code md5sum(const std::string &file_path, /*out*/ 
std::string &result);
 //          B is represent wheter the directory is empty, true means empty, 
otherwise false
 std::pair<error_code, bool> is_directory_empty(const std::string &dirname);
 
+// TODO(yingchun): remove it!
 error_code read_file(const std::string &fname, /*out*/ std::string &buf);
 
 // compare file metadata calculated by fname with expected md5 and file_size
 bool verify_file(const std::string &fname,
+                 FileDataType type,
                  const std::string &expected_md5,
                  const int64_t &expected_fsize);
 
-bool verify_file_size(const std::string &fname, const int64_t &expected_fsize);
-
-bool verify_data_md5(const std::string &fname,
-                     const char *data,
-                     const size_t data_size,
-                     const std::string &expected_md5);
+bool verify_file_size(const std::string &fname, FileDataType type, const 
int64_t &expected_fsize);
 
 // create driectory and get absolute path
 bool create_directory(const std::string &path,
                       /*out*/ std::string &absolute_path,
                       /*out*/ std::string &err_msg);
 
+// TODO(yingchun): remove it!
 bool write_file(const std::string &fname, std::string &buf);
 
 // check if directory is readable and writable
diff --git a/src/utils/test/file_system_test.cpp 
b/src/utils/test/file_system_test.cpp
index ccfc2da2f..f497a908b 100644
--- a/src/utils/test/file_system_test.cpp
+++ b/src/utils/test/file_system_test.cpp
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
@@ -26,6 +27,7 @@
 #include <string>
 
 #include "utils/env.h"
+#include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 
@@ -115,18 +117,63 @@ TEST(filesystem_test_p, encrypted_file_size)
     ASSERT_EQ(kFileContentSize + kEncryptionHeaderkSize, actual_file_size);
 }
 
+// The old filesystem API doesn't support sensitive files, so skip testing
+// FLAGS_encrypt_data_at_rest=true.
+TEST(filesystem_test, check_new_md5sum)
+{
+    FLAGS_encrypt_data_at_rest = false;
+
+    struct file_info
+    {
+        int64_t size;
+    } tests[]{{4095}, {4096}, {4097}};
+
+    for (const auto &test : tests) {
+        std::string fname = "test_file";
+        // deprecated_md5sum doesn't support kSensitive files, so use 
kNonSensitive here.
+        auto s = rocksdb::WriteStringToFile(
+            dsn::utils::PegasusEnv(dsn::utils::FileDataType::kNonSensitive),
+            rocksdb::Slice(std::string(test.size, 'a')),
+            fname,
+            /* should_sync */ true);
+        ASSERT_TRUE(s.ok()) << s.ToString();
+        // Check the file size.
+        int64_t file_fsize;
+        ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, file_fsize));
+        ASSERT_EQ(test.size, file_fsize);
+
+        // Get the md5sum.
+        std::string md5sum1;
+        ASSERT_EQ(ERR_OK, md5sum(fname, md5sum1));
+        ASSERT_FALSE(md5sum1.empty());
+
+        // Check the md5sum is repeatable.
+        std::string md5sum2;
+        ASSERT_EQ(ERR_OK, md5sum(fname, md5sum2));
+        ASSERT_EQ(md5sum1, md5sum2);
+
+        // Check the md5sum is the same to deprecated_md5sum.
+        ASSERT_EQ(ERR_OK, deprecated_md5sum(fname, md5sum2));
+        ASSERT_EQ(md5sum1, md5sum2);
+
+        utils::filesystem::remove_path(fname);
+    }
+}
+
 TEST(filesystem_test, verify_file_test)
 {
+    FLAGS_encrypt_data_at_rest = false;
+
     const std::string &fname = "test_file";
     std::string expected_md5;
     int64_t expected_fsize;
     create_file(fname);
     md5sum(fname, expected_md5);
-    file_size(fname, expected_fsize);
+    ASSERT_TRUE(file_size(fname, FileDataType::kNonSensitive, expected_fsize));
 
-    ASSERT_TRUE(verify_file(fname, expected_md5, expected_fsize));
-    ASSERT_FALSE(verify_file(fname, "wrong_md5", 10086));
-    ASSERT_FALSE(verify_file("file_not_exists", "wrong_md5", 10086));
+    ASSERT_TRUE(verify_file(fname, FileDataType::kNonSensitive, expected_md5, 
expected_fsize));
+    ASSERT_FALSE(verify_file(fname, FileDataType::kNonSensitive, "wrong_md5", 
10086));
+    ASSERT_FALSE(verify_file("file_not_exists", FileDataType::kNonSensitive, 
"wrong_md5", 10086));
 
     remove_path(fname);
 }
diff --git a/src/utils/test/file_utils.cpp b/src/utils/test/file_utils.cpp
index 004e228b4..0cc2173e8 100644
--- a/src/utils/test/file_utils.cpp
+++ b/src/utils/test/file_utils.cpp
@@ -33,6 +33,7 @@
 #include <string>
 #include <vector>
 
+#include "utils/env.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 
@@ -683,12 +684,12 @@ static void file_utils_test_file_size()
     bool ret;
 
     path = "./file_utils_temp.txt";
-    ret = dsn::utils::filesystem::file_size(path, sz);
-    EXPECT_TRUE(ret);
-    EXPECT_TRUE(sz == 12);
+    ret = dsn::utils::filesystem::file_size(path, 
dsn::utils::FileDataType::kNonSensitive, sz);
+    ASSERT_TRUE(ret);
+    ASSERT_EQ(12, sz);
 
     path = "./file_utils_temp2.txt";
-    ret = dsn::utils::filesystem::file_size(path, sz);
+    ret = dsn::utils::filesystem::file_size(path, 
dsn::utils::FileDataType::kNonSensitive, sz);
     EXPECT_FALSE(ret);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to