This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 73d53dbbb refactor(hdfs): use rocksdb API to read/write file (#1618)
73d53dbbb is described below

commit 73d53dbbb923fc15a7b3b5f4def8186c2fc97c73
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Sep 25 16:45:15 2023 +0800

    refactor(hdfs): use rocksdb API to read/write file (#1618)
    
    https://github.com/apache/incubator-pegasus/issues/887
    
    There is no functional changes, but only refactor the HDFS service and
    related unit test.
    The directio_writable_file.* files have been removed because it has been
    implemented by the rocksdb API options.
---
 src/block_service/directio_writable_file.cpp | 169 -------------
 src/block_service/directio_writable_file.h   |  57 -----
 src/block_service/hdfs/CMakeLists.txt        |   8 +-
 src/block_service/hdfs/hdfs_service.cpp      | 181 ++++++++------
 src/block_service/test/config-test.ini       |   4 +-
 src/block_service/test/hdfs_service_test.cpp | 358 ++++++++++++++++-----------
 src/replica/CMakeLists.txt                   |   2 +-
 src/utils/env.cpp                            |   8 +
 src/utils/env.h                              |   4 +-
 9 files changed, 333 insertions(+), 458 deletions(-)

diff --git a/src/block_service/directio_writable_file.cpp 
b/src/block_service/directio_writable_file.cpp
deleted file mode 100644
index 2c74ae05d..000000000
--- a/src/block_service/directio_writable_file.cpp
+++ /dev/null
@@ -1,169 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "block_service/directio_writable_file.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <stdlib.h> // posix_memalign
-#include <sys/stat.h>
-#include <unistd.h> // getpagesize
-#include <algorithm>
-#include <cstring>
-#include <string>
-
-#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/safe_strerror_posix.h"
-
-namespace dsn {
-namespace dist {
-namespace block_service {
-
-DSN_DEFINE_uint32(replication,
-                  direct_io_buffer_pages,
-                  64,
-                  "Number of pages we need to set to direct io buffer");
-DSN_TAG_VARIABLE(direct_io_buffer_pages, FT_MUTABLE);
-
-DSN_DEFINE_bool(replication,
-                enable_direct_io,
-                false,
-                "Whether to enable direct I/O when download files");
-DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE);
-
-const uint32_t g_page_size = getpagesize();
-
-direct_io_writable_file::direct_io_writable_file(const std::string &file_path)
-    : _file_path(file_path),
-      _fd(-1),
-      _file_size(0),
-      _buffer(nullptr),
-      _buffer_size(FLAGS_direct_io_buffer_pages * g_page_size),
-      _offset(0)
-{
-}
-
-direct_io_writable_file::~direct_io_writable_file()
-{
-    if (!_buffer || _fd < 0) {
-        return;
-    }
-    // Here is an ensurance, users shuold call finalize manually
-    CHECK_EQ_MSG(_offset, 0, "finalize() should be called before destructor");
-
-    ::free(_buffer);
-    CHECK_EQ_MSG(
-        0, ::close(_fd), "Failed to close {}, err = {}", _file_path, 
utils::safe_strerror(errno));
-}
-
-bool direct_io_writable_file::initialize()
-{
-    if (posix_memalign(&_buffer, g_page_size, _buffer_size) != 0) {
-        LOG_ERROR("Allocate memaligned buffer failed, err = {}", 
utils::safe_strerror(errno));
-        return false;
-    }
-
-    int flag = O_WRONLY | O_TRUNC | O_CREAT;
-#if !defined(__APPLE__)
-    flag |= O_DIRECT;
-#endif
-    // TODO(yingchun): there maybe serious error of the disk driver when these 
system call failed,
-    // maybe just terminate the process or mark the disk as failed would be 
better
-    _fd = ::open(_file_path.c_str(), flag, S_IRUSR | S_IWUSR | S_IRGRP);
-    if (_fd < 0) {
-        LOG_ERROR("Failed to open {} with flag {}, err = {}",
-                  _file_path,
-                  flag,
-                  utils::safe_strerror(errno));
-        ::free(_buffer);
-        _buffer = nullptr;
-        return false;
-    }
-    return true;
-}
-
-bool direct_io_writable_file::finalize()
-{
-    CHECK(_buffer && _fd >= 0, "Initialize the instance first");
-
-    if (_offset > 0) {
-        ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size);
-        if (dsn_unlikely(written_bytes < 0)) {
-            LOG_ERROR("Failed to write the last chunk, file_path = {}, err = 
{}",
-                      _file_path,
-                      utils::safe_strerror(errno));
-            return false;
-        }
-        // TODO(yingchun): would better to retry
-        if (dsn_unlikely(written_bytes != _buffer_size)) {
-            LOG_ERROR("Failed to write the last chunk, file_path = {}, data 
bytes = {}, written "
-                      "bytes = {}",
-                      _file_path,
-                      _buffer_size,
-                      written_bytes);
-            return false;
-        }
-        _offset = 0;
-        if (::ftruncate(_fd, _file_size) < 0) {
-            LOG_ERROR("Failed to truncate {}, err = {}", _file_path, 
utils::safe_strerror(errno));
-            return false;
-        }
-    }
-    return true;
-}
-
-bool direct_io_writable_file::write(const char *s, size_t n)
-{
-    CHECK(_buffer && _fd >= 0, "Initialize the instance first");
-
-    uint32_t remaining = n;
-    while (remaining > 0) {
-        uint32_t bytes = std::min((_buffer_size - _offset), remaining);
-        memcpy((char *)_buffer + _offset, s, bytes);
-        _offset += bytes;
-        remaining -= bytes;
-        s += bytes;
-        // buffer is full, flush to file
-        if (_offset == _buffer_size) {
-            ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size);
-            if (dsn_unlikely(written_bytes < 0)) {
-                LOG_ERROR("Failed to write chunk, file_path = {}, err = {}",
-                          _file_path,
-                          utils::safe_strerror(errno));
-                return false;
-            }
-            // TODO(yingchun): would better to retry
-            if (dsn_unlikely(written_bytes != _buffer_size)) {
-                LOG_ERROR(
-                    "Failed to write chunk, file_path = {}, data bytes = {}, 
written bytes = {}",
-                    _file_path,
-                    _buffer_size,
-                    written_bytes);
-                return false;
-            }
-            // reset offset
-            _offset = 0;
-        }
-    }
-    _file_size += n;
-    return true;
-}
-
-} // namespace block_service
-} // namespace dist
-} // namespace dsn
diff --git a/src/block_service/directio_writable_file.h 
b/src/block_service/directio_writable_file.h
deleted file mode 100644
index d4f99949f..000000000
--- a/src/block_service/directio_writable_file.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-#include <string>
-
-#include "utils/ports.h"
-
-namespace dsn {
-namespace dist {
-namespace block_service {
-
-class direct_io_writable_file
-{
-public:
-    explicit direct_io_writable_file(const std::string &file_path);
-    ~direct_io_writable_file();
-
-    bool initialize();
-    bool write(const char *s, size_t n);
-    bool finalize();
-
-private:
-    DISALLOW_COPY_AND_ASSIGN(direct_io_writable_file);
-
-    std::string _file_path;
-    int _fd;
-    uint32_t _file_size;
-
-    // page size aligned buffer
-    void *_buffer;
-    // buffer size
-    uint32_t _buffer_size;
-    // buffer offset
-    uint32_t _offset;
-};
-
-} // namespace block_service
-} // namespace dist
-} // namespace dsn
diff --git a/src/block_service/hdfs/CMakeLists.txt 
b/src/block_service/hdfs/CMakeLists.txt
index 803e85bec..2bba8b96b 100644
--- a/src/block_service/hdfs/CMakeLists.txt
+++ b/src/block_service/hdfs/CMakeLists.txt
@@ -17,20 +17,16 @@
 
 set(MY_PROJ_NAME dsn.block_service.hdfs)
 
-set(DIRECTIO_SRC
-        ../directio_writable_file.cpp
-        )
-
 #Source files under CURRENT project directory will be automatically included.
 #You can manually set MY_PROJ_SRC to include source files under other 
directories.
-set(MY_PROJ_SRC "${DIRECTIO_SRC}")
+set(MY_PROJ_SRC "")
 
 #Search mode for source files under CURRENT project directory ?
 #"GLOB_RECURSE" for recursive search
 #"GLOB" for non - recursive search
 set(MY_SRC_SEARCH_MODE "GLOB")
 
-set(MY_PROJ_LIBS hdfs)
+set(MY_PROJ_LIBS hdfs rocksdb)
 
 #Extra files that will be installed
 set(MY_BINPLACES "")
diff --git a/src/block_service/hdfs/hdfs_service.cpp 
b/src/block_service/hdfs/hdfs_service.cpp
index 459108e11..5909f40de 100644
--- a/src/block_service/hdfs/hdfs_service.cpp
+++ b/src/block_service/hdfs/hdfs_service.cpp
@@ -17,19 +17,21 @@
 
 #include <errno.h>
 #include <fcntl.h>
+#include <rocksdb/env.h>
 #include <algorithm>
-#include <fstream>
 #include <type_traits>
 #include <utility>
 
-#include "block_service/directio_writable_file.h"
 #include "hdfs/hdfs.h"
 #include "hdfs_service.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task.h"
 #include "utils/TokenBucket.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
+#include "utils/env.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
@@ -37,6 +39,8 @@
 #include "utils/safe_strerror_posix.h"
 #include "utils/strings.h"
 
+DSN_DECLARE_bool(enable_direct_io);
+
 struct hdfsBuilder;
 
 namespace dsn {
@@ -65,8 +69,6 @@ DSN_DEFINE_uint64(replication,
                   "hdfs write batch size, the default value is 64MB");
 DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE);
 
-DSN_DECLARE_bool(enable_direct_io);
-
 hdfs_service::hdfs_service()
 {
     _read_token_bucket.reset(new folly::DynamicTokenBucket());
@@ -108,12 +110,12 @@ error_code hdfs_service::create_fs()
     hdfsBuilderSetNameNode(builder, _hdfs_name_node.c_str());
     _fs = hdfsBuilderConnect(builder);
     if (!_fs) {
-        LOG_ERROR("Fail to connect hdfs name node {}, error: {}.",
+        LOG_ERROR("Fail to connect HDFS name node {}, error: {}.",
                   _hdfs_name_node,
                   utils::safe_strerror(errno));
         return ERR_FS_INTERNAL;
     }
-    LOG_INFO("Succeed to connect hdfs name node {}.", _hdfs_name_node);
+    LOG_INFO("Succeed to connect HDFS name node {}.", _hdfs_name_node);
     return ERR_OK;
 }
 
@@ -122,10 +124,10 @@ void hdfs_service::close()
     // This method should be carefully called.
     // Calls to hdfsDisconnect() by individual threads would terminate
     // all other connections handed out via hdfsConnect() to the same URI.
-    LOG_INFO("Try to disconnect hdfs.");
+    LOG_INFO("Try to disconnect HDFS.");
     int result = hdfsDisconnect(_fs);
     if (result == -1) {
-        LOG_ERROR("Fail to disconnect from the hdfs file system, error: {}.",
+        LOG_ERROR("Fail to disconnect from the HDFS file system, error: {}.",
                   utils::safe_strerror(errno));
     }
     // Even if there is an error, the resources associated with the hdfsFS 
will be freed.
@@ -134,7 +136,7 @@ void hdfs_service::close()
 
 std::string hdfs_service::get_hdfs_entry_name(const std::string &hdfs_path)
 {
-    // get exact file name from an hdfs path.
+    // get exact file name from an HDFS path.
     int pos = hdfs_path.find_last_of("/");
     return hdfs_path.substr(pos + 1);
 }
@@ -305,7 +307,7 @@ error_code hdfs_file_object::write_data_in_batches(const 
char *data,
     hdfsFile write_file =
         hdfsOpenFile(_service->get_fs(), file_name().c_str(), O_WRONLY | 
O_CREAT, 0, 0, 0);
     if (!write_file) {
-        LOG_ERROR("Failed to open hdfs file {} for writting, error: {}.",
+        LOG_ERROR("Failed to open HDFS file {} for writting, error: {}.",
                   file_name(),
                   utils::safe_strerror(errno));
         return ERR_FS_INTERNAL;
@@ -323,7 +325,7 @@ error_code hdfs_file_object::write_data_in_batches(const 
char *data,
                                             (void *)(data + cur_pos),
                                             static_cast<tSize>(write_len));
         if (num_written_bytes == -1) {
-            LOG_ERROR("Failed to write hdfs file {}, error: {}.",
+            LOG_ERROR("Failed to write HDFS file {}, error: {}.",
                       file_name(),
                       utils::safe_strerror(errno));
             hdfsCloseFile(_service->get_fs(), write_file);
@@ -333,18 +335,18 @@ error_code hdfs_file_object::write_data_in_batches(const 
char *data,
     }
     if (hdfsHFlush(_service->get_fs(), write_file) != 0) {
         LOG_ERROR(
-            "Failed to flush hdfs file {}, error: {}.", file_name(), 
utils::safe_strerror(errno));
+            "Failed to flush HDFS file {}, error: {}.", file_name(), 
utils::safe_strerror(errno));
         hdfsCloseFile(_service->get_fs(), write_file);
         return ERR_FS_INTERNAL;
     }
     written_size = cur_pos;
     if (hdfsCloseFile(_service->get_fs(), write_file) != 0) {
         LOG_ERROR(
-            "Failed to close hdfs file {}, error: {}", file_name(), 
utils::safe_strerror(errno));
+            "Failed to close HDFS file {}, error: {}", file_name(), 
utils::safe_strerror(errno));
         return ERR_FS_INTERNAL;
     }
 
-    LOG_INFO("start to synchronize meta data after successfully wrote data to 
hdfs");
+    LOG_INFO("start to synchronize meta data after successfully wrote data to 
HDFS");
     return get_file_meta();
 }
 
@@ -376,23 +378,51 @@ dsn::task_ptr hdfs_file_object::upload(const 
upload_request &req,
 
     add_ref();
     auto upload_background = [this, req, t]() {
+        LOG_INFO("start to upload from '{}' to '{}'", req.input_local_name, 
file_name());
+
         upload_response resp;
-        resp.uploaded_size = 0;
-        std::ifstream is(req.input_local_name, std::ios::binary | 
std::ios::in);
-        if (is.is_open()) {
-            int64_t file_sz = 0;
-            dsn::utils::filesystem::file_size(req.input_local_name, file_sz);
-            std::unique_ptr<char[]> buffer(new char[file_sz]);
-            is.read(buffer.get(), file_sz);
-            is.close();
-            resp.err = write_data_in_batches(buffer.get(), file_sz, 
resp.uploaded_size);
-        } else {
-            LOG_ERROR("HDFS upload failed: open local file {} failed when 
upload to {}, error: {}",
-                      req.input_local_name,
-                      file_name(),
-                      utils::safe_strerror(errno));
-            resp.err = dsn::ERR_FILE_OPERATION_FAILED;
-        }
+        do {
+            rocksdb::EnvOptions env_options;
+            env_options.use_direct_reads = FLAGS_enable_direct_io;
+            std::unique_ptr<rocksdb::SequentialFile> rfile;
+            auto s = rocksdb::Env::Default()->NewSequentialFile(
+                req.input_local_name, &rfile, env_options);
+            if (!s.ok()) {
+                LOG_ERROR(
+                    "open local file '{}' failed, err = {}", 
req.input_local_name, s.ToString());
+                resp.err = ERR_FILE_OPERATION_FAILED;
+                break;
+            }
+
+            int64_t file_size;
+            if (!dsn::utils::filesystem::file_size(
+                    req.input_local_name, 
dsn::utils::FileDataType::kSensitive, file_size)) {
+                LOG_ERROR("get size of local file '{}' failed", 
req.input_local_name);
+                resp.err = ERR_FILE_OPERATION_FAILED;
+                break;
+            }
+
+            rocksdb::Slice result;
+            char scratch[file_size];
+            s = rfile->Read(file_size, &result, scratch);
+            if (!s.ok()) {
+                LOG_ERROR(
+                    "read local file '{}' failed, err = {}", 
req.input_local_name, s.ToString());
+                resp.err = ERR_FILE_OPERATION_FAILED;
+                break;
+            }
+
+            resp.err = write_data_in_batches(result.data(), result.size(), 
resp.uploaded_size);
+            if (resp.err != ERR_OK) {
+                LOG_ERROR("write data to remote '{}' failed, err = {}", 
file_name(), resp.err);
+                break;
+            }
+
+            LOG_INFO("finish to upload from '{}' to '{}', size = {}",
+                     req.input_local_name,
+                     file_name(),
+                     resp.uploaded_size);
+        } while (false);
         t->enqueue_with(resp);
         release_ref();
     };
@@ -417,7 +447,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t 
start_pos,
 
     hdfsFile read_file = hdfsOpenFile(_service->get_fs(), file_name().c_str(), 
O_RDONLY, 0, 0, 0);
     if (!read_file) {
-        LOG_ERROR("Failed to open hdfs file {} for reading, error: {}.",
+        LOG_ERROR("Failed to open HDFS file {} for reading, error: {}.",
                   file_name(),
                   utils::safe_strerror(errno));
         return ERR_FS_INTERNAL;
@@ -446,7 +476,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t 
start_pos,
             cur_pos += num_read_bytes;
             dst_buf += num_read_bytes;
         } else if (num_read_bytes == -1) {
-            LOG_ERROR("Failed to read hdfs file {}, error: {}.",
+            LOG_ERROR("Failed to read HDFS file {}, error: {}.",
                       file_name(),
                       utils::safe_strerror(errno));
             read_success = false;
@@ -455,7 +485,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t 
start_pos,
     }
     if (hdfsCloseFile(_service->get_fs(), read_file) != 0) {
         LOG_ERROR(
-            "Failed to close hdfs file {}, error: {}.", file_name(), 
utils::safe_strerror(errno));
+            "Failed to close HDFS file {}, error: {}.", file_name(), 
utils::safe_strerror(errno));
         return ERR_FS_INTERNAL;
     }
     if (read_success) {
@@ -504,48 +534,53 @@ dsn::task_ptr hdfs_file_object::download(const 
download_request &req,
     auto download_background = [this, req, t]() {
         download_response resp;
         resp.downloaded_size = 0;
-        std::string read_buffer;
-        size_t read_length = 0;
-        resp.err =
-            read_data_in_batches(req.remote_pos, req.remote_length, 
read_buffer, read_length);
-        if (resp.err == ERR_OK) {
-            bool write_succ = false;
-            if (FLAGS_enable_direct_io) {
-                auto dio_file = 
std::make_unique<direct_io_writable_file>(req.output_local_name);
-                do {
-                    if (!dio_file->initialize()) {
-                        break;
-                    }
-                    bool wr_ret = dio_file->write(read_buffer.c_str(), 
read_length);
-                    if (!wr_ret) {
-                        break;
-                    }
-                    if (dio_file->finalize()) {
-                        resp.downloaded_size = read_length;
-                        resp.file_md5 = utils::string_md5(read_buffer.c_str(), 
read_length);
-                        write_succ = true;
-                    }
-                } while (0);
-            } else {
-                std::ofstream out(req.output_local_name,
-                                  std::ios::binary | std::ios::out | 
std::ios::trunc);
-                if (out.is_open()) {
-                    out.write(read_buffer.c_str(), read_length);
-                    out.close();
-                    resp.downloaded_size = read_length;
-                    resp.file_md5 = utils::string_md5(read_buffer.c_str(), 
read_length);
-                    write_succ = true;
-                }
+        resp.err = ERR_OK;
+        bool write_succ = false;
+        std::string target_file = req.output_local_name;
+        do {
+            LOG_INFO("start to download from '{}' to '{}'", file_name(), 
target_file);
+
+            std::string read_buffer;
+            size_t read_length = 0;
+            resp.err =
+                read_data_in_batches(req.remote_pos, req.remote_length, 
read_buffer, read_length);
+            if (resp.err != ERR_OK) {
+                LOG_ERROR("read data from remote '{}' failed, err = {}", 
file_name(), resp.err);
+                break;
             }
-            if (!write_succ) {
-                LOG_ERROR("HDFS download failed: fail to open localfile {} 
when download {}, "
-                          "error: {}",
-                          req.output_local_name,
-                          file_name(),
-                          utils::safe_strerror(errno));
-                resp.err = ERR_FILE_OPERATION_FAILED;
-                resp.downloaded_size = 0;
+
+            rocksdb::EnvOptions env_options;
+            env_options.use_direct_writes = FLAGS_enable_direct_io;
+            std::unique_ptr<rocksdb::WritableFile> wfile;
+            auto s = rocksdb::Env::Default()->NewWritableFile(target_file, 
&wfile, env_options);
+            if (!s.ok()) {
+                LOG_ERROR("create local file '{}' failed, err = {}", 
target_file, s.ToString());
+                break;
+            }
+
+            s = wfile->Append(rocksdb::Slice(read_buffer.data(), read_length));
+            if (!s.ok()) {
+                LOG_ERROR("append local file '{}' failed, err = {}", 
target_file, s.ToString());
+                break;
+            }
+
+            s = wfile->Fsync();
+            if (!s.ok()) {
+                LOG_ERROR("fsync local file '{}' failed, err = {}", 
target_file, s.ToString());
+                break;
             }
+
+            resp.downloaded_size = read_length;
+            resp.file_md5 = utils::string_md5(read_buffer.c_str(), 
read_length);
+            write_succ = true;
+        } while (false);
+
+        if (!write_succ) {
+            LOG_ERROR("HDFS download failed: fail to write local file {} when 
download {}",
+                      target_file,
+                      file_name());
+            resp.err = ERR_FILE_OPERATION_FAILED;
+            resp.downloaded_size = 0;
         }
         t->enqueue_with(resp);
         release_ref();
diff --git a/src/block_service/test/config-test.ini 
b/src/block_service/test/config-test.ini
index c1996d551..2acb86e2b 100644
--- a/src/block_service/test/config-test.ini
+++ b/src/block_service/test/config-test.ini
@@ -53,7 +53,7 @@ max_size = 150
 worker_count = 8
 
 [hdfs_test]
-test_name_node = <hdfs_name_none>
-test_backup_path = <hdfs_path>
+test_name_node =
+test_backup_path =
 num_test_file_lines = 4096
 num_total_files_for_hdfs_concurrent_test = 64
diff --git a/src/block_service/test/hdfs_service_test.cpp 
b/src/block_service/test/hdfs_service_test.cpp
index 6cfcfb379..d7b33a8da 100644
--- a/src/block_service/test/hdfs_service_test.cpp
+++ b/src/block_service/test/hdfs_service_test.cpp
@@ -15,15 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <fmt/core.h>
+#include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
 #include <stdio.h>
 #include <string.h>
 #include <unistd.h>
 #include <algorithm>
 #include <cstdint>
-#include <fstream>
+#include <iostream>
 #include <memory>
 #include <string>
 #include <vector>
@@ -35,27 +40,23 @@
 #include "runtime/task/task.h"
 #include "runtime/task/task_code.h"
 #include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/enum_helper.h"
+#include "utils/env.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
-#include "utils/strings.h"
+#include "utils/fmt_logging.h"
+#include "utils/test_macros.h"
 #include "utils/threadpool_code.h"
 
 using namespace dsn;
 using namespace dsn::dist::block_service;
 
-static std::string example_name_node = "<hdfs_name_none>";
-static std::string example_backup_path = "<hdfs_path>";
-// Please modify following paras in 'config-test.ini' to enable 
hdfs_service_test,
-// or hdfs_service_test will be skipped and return true.
-DSN_DEFINE_string(hdfs_test, test_name_node, "<hdfs_name_none>", "hdfs name 
node");
-DSN_DEFINE_string(hdfs_test,
-                  test_backup_path,
-                  "<hdfs_path>",
-                  "path for uploading and downloading test files");
+DSN_DEFINE_string(hdfs_test, test_name_node, "", "hdfs name node");
+DSN_DEFINE_string(hdfs_test, test_backup_path, "", "path for uploading and 
downloading test files");
 
 DSN_DEFINE_uint32(hdfs_test, num_test_file_lines, 4096, "number of lines in 
test file");
 DSN_DEFINE_uint32(hdfs_test,
@@ -65,206 +66,257 @@ DSN_DEFINE_uint32(hdfs_test,
 
 DEFINE_TASK_CODE(LPC_TEST_HDFS, TASK_PRIORITY_HIGH, dsn::THREAD_POOL_DEFAULT)
 
-class HDFSClientTest : public testing::Test
+class HDFSClientTest : public pegasus::encrypt_data_test_base
 {
 protected:
-    virtual void SetUp() override;
-    virtual void TearDown() override;
-    void generate_test_file(const char *filename);
-    void write_test_files_async(task_tracker *tracker);
-    std::string name_node;
-    std::string backup_path;
-    std::string local_test_dir;
-    std::string test_data_str;
+    void generate_test_file(const std::string &filename);
+    void write_test_files_async(const std::string &local_test_path,
+                                int total_files,
+                                int *success_count,
+                                task_tracker *tracker);
 };
 
-void HDFSClientTest::SetUp()
+void HDFSClientTest::generate_test_file(const std::string &filename)
 {
-    name_node = FLAGS_test_name_node;
-    backup_path = FLAGS_test_backup_path;
-    local_test_dir = "test_dir";
-    test_data_str = "";
-    for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
-        test_data_str += "test";
+    int lines = FLAGS_num_test_file_lines;
+    std::unique_ptr<rocksdb::WritableFile> wfile;
+    auto s = rocksdb::Env::Default()->NewWritableFile(filename, &wfile, 
rocksdb::EnvOptions());
+    ASSERT_TRUE(s.ok()) << s.ToString();
+    for (int i = 0; i < lines; ++i) {
+        rocksdb::Slice data(fmt::format("{:04}d_this_is_a_simple_test_file\n", 
i));
+        s = wfile->Append(data);
+        ASSERT_TRUE(s.ok()) << s.ToString();
     }
+    s = wfile->Fsync();
+    ASSERT_TRUE(s.ok()) << s.ToString();
 }
 
-void HDFSClientTest::TearDown() {}
-
-void HDFSClientTest::generate_test_file(const char *filename)
+void HDFSClientTest::write_test_files_async(const std::string &local_test_path,
+                                            int total_files,
+                                            int *success_count,
+                                            task_tracker *tracker)
 {
-    // generate a local test file.
-    int lines = FLAGS_num_test_file_lines;
-    FILE *fp = fopen(filename, "wb");
-    for (int i = 0; i < lines; ++i) {
-        fprintf(fp, "%04d_this_is_a_simple_test_file\n", i);
+    dsn::utils::filesystem::create_directory(local_test_path);
+    std::string local_test_data;
+    for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
+        local_test_data += "test";
+    }
+    for (int i = 0; i < total_files; ++i) {
+        tasking::enqueue(
+            LPC_TEST_HDFS, tracker, [&local_test_path, &local_test_data, i, 
success_count]() {
+                // mock the writing process in hdfs_file_object::download().
+                std::string test_file_name = local_test_path + "/test_file_" + 
std::to_string(i);
+                auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+                                                    
rocksdb::Slice(local_test_data),
+                                                    test_file_name,
+                                                    /* should_sync */ true);
+                if (s.ok()) {
+                    ++(*success_count);
+                } else {
+                    CHECK(s.IsIOError(), "{}", s.ToString());
+                    auto pos1 = s.ToString().find(
+                        "IO error: No such file or directory: While open a 
file for appending: ");
+                    auto pos2 = s.ToString().find("IO error: While appending 
to file: ");
+                    CHECK(pos1 == 0 || pos2 == 0, "{}", s.ToString());
+                }
+            });
     }
-    fclose(fp);
 }
 
-void HDFSClientTest::write_test_files_async(task_tracker *tracker)
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, HDFSClientTest, ::testing::Values(false));
+
+TEST_P(HDFSClientTest, test_hdfs_read_write)
 {
-    dsn::utils::filesystem::create_directory(local_test_dir);
-    for (int i = 0; i < 100; ++i) {
-        tasking::enqueue(LPC_TEST_HDFS, tracker, [this, i]() {
-            // mock the writing process in hdfs_file_object::download().
-            std::string test_file_name = local_test_dir + "/test_file_" + 
std::to_string(i);
-            std::ofstream out(test_file_name, std::ios::binary | std::ios::out 
| std::ios::trunc);
-            if (out.is_open()) {
-                out.write(test_data_str.c_str(), test_data_str.length());
-            }
-            out.close();
-        });
+    if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 
0) {
+        // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+        std::cout << "Set hdfs_test.* configs in config-test.ini to enable 
hdfs_service_test."
+                  << std::endl;
+        return;
     }
+
+    auto s = std::make_shared<hdfs_service>();
+    ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, 
FLAGS_test_backup_path}));
+
+    const std::string kRemoteTestPath = "hdfs_client_test";
+    const std::string kRemoteTestRWFile = kRemoteTestPath + "/test_write_file";
+    const std::string kTestBuffer = "write_hello_world_for_test";
+    const int kTestBufferLength = kTestBuffer.size();
+
+    // 1. clean up all old file in remote test directory.
+    printf("clean up all old files.\n");
+    remove_path_response rem_resp;
+    s->remove_path(remove_path_request{kRemoteTestPath, true},
+                   LPC_TEST_HDFS,
+                   [&rem_resp](const remove_path_response &resp) { rem_resp = 
resp; },
+                   nullptr)
+        ->wait();
+    ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND == 
rem_resp.err);
+
+    // 2. create file.
+    printf("test write operation.\n");
+    create_file_response cf_resp;
+    s->create_file(create_file_request{kRemoteTestRWFile, false},
+                   LPC_TEST_HDFS,
+                   [&cf_resp](const create_file_response &r) { cf_resp = r; },
+                   nullptr)
+        ->wait();
+    ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
+
+    // 3. write file.
+    dsn::blob bb(kTestBuffer.c_str(), 0, kTestBufferLength);
+    write_response w_resp;
+    cf_resp.file_handle
+        ->write(write_request{bb},
+                LPC_TEST_HDFS,
+                [&w_resp](const write_response &w) { w_resp = w; },
+                nullptr)
+        ->wait();
+    ASSERT_EQ(dsn::ERR_OK, w_resp.err);
+    ASSERT_EQ(kTestBufferLength, w_resp.written_size);
+    ASSERT_EQ(kTestBufferLength, cf_resp.file_handle->get_size());
+
+    // 4. read file.
+    printf("test read just written contents.\n");
+    read_response r_resp;
+    cf_resp.file_handle
+        ->read(read_request{0, -1},
+               LPC_TEST_HDFS,
+               [&r_resp](const read_response &r) { r_resp = r; },
+               nullptr)
+        ->wait();
+    ASSERT_EQ(dsn::ERR_OK, r_resp.err);
+    ASSERT_EQ(kTestBufferLength, r_resp.buffer.length());
+    ASSERT_EQ(kTestBuffer, r_resp.buffer.to_string());
+
+    // 5. partial read.
+    const uint64_t kOffset = 5;
+    const int64_t kSize = 10;
+    cf_resp.file_handle
+        ->read(read_request{kOffset, kSize},
+               LPC_TEST_HDFS,
+               [&r_resp](const read_response &r) { r_resp = r; },
+               nullptr)
+        ->wait();
+    ASSERT_EQ(dsn::ERR_OK, r_resp.err);
+    ASSERT_EQ(kSize, r_resp.buffer.length());
+    ASSERT_EQ(kTestBuffer.substr(kOffset, kSize), r_resp.buffer.to_string());
 }
 
-TEST_F(HDFSClientTest, test_basic_operation)
+TEST_P(HDFSClientTest, test_upload_and_download)
 {
-    if (name_node == example_name_node || backup_path == example_backup_path) {
+    if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 
0) {
+        // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+        std::cout << "Set hdfs_test.* configs in config-test.ini to enable 
hdfs_service_test."
+                  << std::endl;
         return;
     }
 
-    std::vector<std::string> args = {name_node, backup_path};
-    std::shared_ptr<hdfs_service> s = std::make_shared<hdfs_service>();
-    ASSERT_EQ(dsn::ERR_OK, s->initialize(args));
+    auto s = std::make_shared<hdfs_service>();
+    ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, 
FLAGS_test_backup_path}));
 
-    std::string local_test_file = "test_file";
-    std::string remote_test_file = "hdfs_client_test/test_file";
-    int64_t test_file_size = 0;
+    const std::string kLocalFile = "test_file";
+    const std::string kRemoteTestPath = "hdfs_client_test";
+    const std::string kRemoteTestFile = kRemoteTestPath + "/test_file";
 
-    generate_test_file(local_test_file.c_str());
-    dsn::utils::filesystem::file_size(local_test_file, test_file_size);
+    // 0. generate test file.
+    NO_FATALS(generate_test_file(kLocalFile));
+    int64_t local_file_size = 0;
+    ASSERT_TRUE(dsn::utils::filesystem::file_size(
+        kLocalFile, dsn::utils::FileDataType::kSensitive, local_file_size));
+    std::string local_file_md5sum;
+    dsn::utils::filesystem::md5sum(kLocalFile, local_file_md5sum);
 
-    // fisrt clean up all old file in test directory.
+    // 1. clean up all old file in remote test directory.
     printf("clean up all old files.\n");
     remove_path_response rem_resp;
-    s->remove_path(remove_path_request{"hdfs_client_test", true},
+    s->remove_path(remove_path_request{kRemoteTestPath, true},
                    LPC_TEST_HDFS,
                    [&rem_resp](const remove_path_response &resp) { rem_resp = 
resp; },
                    nullptr)
         ->wait();
     ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND == 
rem_resp.err);
 
-    // test upload file.
-    printf("create and upload: %s.\n", remote_test_file.c_str());
+    // 2. create file.
+    printf("create and upload: %s.\n", kRemoteTestFile.c_str());
     create_file_response cf_resp;
-    s->create_file(create_file_request{remote_test_file, true},
+    s->create_file(create_file_request{kRemoteTestFile, true},
                    LPC_TEST_HDFS,
                    [&cf_resp](const create_file_response &r) { cf_resp = r; },
                    nullptr)
         ->wait();
-    ASSERT_EQ(cf_resp.err, dsn::ERR_OK);
+    ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
+
+    // 3. upload file.
     upload_response u_resp;
     cf_resp.file_handle
-        ->upload(upload_request{local_test_file},
+        ->upload(upload_request{kLocalFile},
                  LPC_TEST_HDFS,
                  [&u_resp](const upload_response &r) { u_resp = r; },
                  nullptr)
         ->wait();
     ASSERT_EQ(dsn::ERR_OK, u_resp.err);
-    ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size());
+    ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size());
 
-    // test list directory.
+    // 4. list directory.
     ls_response l_resp;
-    s->list_dir(ls_request{"hdfs_client_test"},
+    s->list_dir(ls_request{kRemoteTestPath},
                 LPC_TEST_HDFS,
                 [&l_resp](const ls_response &resp) { l_resp = resp; },
                 nullptr)
         ->wait();
     ASSERT_EQ(dsn::ERR_OK, l_resp.err);
     ASSERT_EQ(1, l_resp.entries->size());
-    ASSERT_EQ("test_file", l_resp.entries->at(0).entry_name);
+    ASSERT_EQ(kLocalFile, l_resp.entries->at(0).entry_name);
     ASSERT_EQ(false, l_resp.entries->at(0).is_directory);
 
-    // test download file.
+    // 5. download file.
     download_response d_resp;
-    printf("test download %s.\n", remote_test_file.c_str());
-    s->create_file(create_file_request{remote_test_file, false},
+    printf("test download %s.\n", kRemoteTestFile.c_str());
+    s->create_file(create_file_request{kRemoteTestFile, false},
                    LPC_TEST_HDFS,
                    [&cf_resp](const create_file_response &resp) { cf_resp = 
resp; },
                    nullptr)
         ->wait();
     ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
-    ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size());
-    std::string local_file_for_download = "test_file_d";
+    ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size());
+    std::string kLocalDownloadFile = "test_file_d";
     cf_resp.file_handle
-        ->download(download_request{local_file_for_download, 0, -1},
+        ->download(download_request{kLocalDownloadFile, 0, -1},
                    LPC_TEST_HDFS,
                    [&d_resp](const download_response &resp) { d_resp = resp; },
                    nullptr)
         ->wait();
     ASSERT_EQ(dsn::ERR_OK, d_resp.err);
-    ASSERT_EQ(test_file_size, d_resp.downloaded_size);
-
-    // compare local_test_file and local_file_for_download.
-    int64_t file_size = 0;
-    dsn::utils::filesystem::file_size(local_file_for_download, file_size);
-    ASSERT_EQ(test_file_size, file_size);
-    std::string test_file_md5sum;
-    dsn::utils::filesystem::md5sum(local_test_file, test_file_md5sum);
-    std::string downloaded_file_md5sum;
-    dsn::utils::filesystem::md5sum(local_file_for_download, 
downloaded_file_md5sum);
-    ASSERT_EQ(test_file_md5sum, downloaded_file_md5sum);
-
-    // test read and write.
-    printf("test read write operation.\n");
-    std::string test_write_file = "hdfs_client_test/test_write_file";
-    s->create_file(create_file_request{test_write_file, false},
-                   LPC_TEST_HDFS,
-                   [&cf_resp](const create_file_response &r) { cf_resp = r; },
-                   nullptr)
-        ->wait();
-    ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
-    const char *test_buffer = "write_hello_world_for_test";
-    int length = strlen(test_buffer);
-    dsn::blob bb(test_buffer, 0, length);
-    write_response w_resp;
-    cf_resp.file_handle
-        ->write(write_request{bb},
-                LPC_TEST_HDFS,
-                [&w_resp](const write_response &w) { w_resp = w; },
-                nullptr)
-        ->wait();
-    ASSERT_EQ(dsn::ERR_OK, w_resp.err);
-    ASSERT_EQ(length, w_resp.written_size);
-    ASSERT_EQ(length, cf_resp.file_handle->get_size());
-    printf("test read just written contents.\n");
-    read_response r_resp;
-    cf_resp.file_handle
-        ->read(read_request{0, -1},
-               LPC_TEST_HDFS,
-               [&r_resp](const read_response &r) { r_resp = r; },
-               nullptr)
-        ->wait();
-    ASSERT_EQ(dsn::ERR_OK, r_resp.err);
-    ASSERT_EQ(length, r_resp.buffer.length());
-    ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer, 
length));
-
-    // test partitial read.
-    cf_resp.file_handle
-        ->read(read_request{5, 10},
-               LPC_TEST_HDFS,
-               [&r_resp](const read_response &r) { r_resp = r; },
-               nullptr)
-        ->wait();
-    ASSERT_EQ(dsn::ERR_OK, r_resp.err);
-    ASSERT_EQ(10, r_resp.buffer.length());
-    ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer + 5, 
10));
-
-    // clean up local test files.
-    utils::filesystem::remove_path(local_test_file);
-    utils::filesystem::remove_path(local_file_for_download);
+    ASSERT_EQ(local_file_size, d_resp.downloaded_size);
+
+    // 6. compare kLocalFile and kLocalDownloadFile.
+    // 6.1 check file size.
+    int64_t local_download_file_size = 0;
+    ASSERT_TRUE(dsn::utils::filesystem::file_size(
+        kLocalDownloadFile, dsn::utils::FileDataType::kSensitive, 
local_download_file_size));
+    ASSERT_EQ(local_file_size, local_download_file_size);
+    // 6.2 check file md5sum.
+    std::string local_download_file_md5sum;
+    dsn::utils::filesystem::md5sum(kLocalDownloadFile, 
local_download_file_md5sum);
+    ASSERT_EQ(local_file_md5sum, local_download_file_md5sum);
+
+    // 7. clean up local test files.
+    utils::filesystem::remove_path(kLocalFile);
+    utils::filesystem::remove_path(kLocalDownloadFile);
 }
 
-TEST_F(HDFSClientTest, test_concurrent_upload_download)
+TEST_P(HDFSClientTest, test_concurrent_upload_download)
 {
-    if (name_node == example_name_node || backup_path == example_backup_path) {
+    if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) == 
0) {
+        // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+        std::cout << "Set hdfs_test.* configs in config-test.ini to enable 
hdfs_service_test."
+                  << std::endl;
         return;
     }
 
-    std::vector<std::string> args = {name_node, backup_path};
-    std::shared_ptr<hdfs_service> s = std::make_shared<hdfs_service>();
-    ASSERT_EQ(dsn::ERR_OK, s->initialize(args));
+    auto s = std::make_shared<hdfs_service>();
+    ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node, 
FLAGS_test_backup_path}));
 
     int total_files = FLAGS_num_total_files_for_hdfs_concurrent_test;
     std::vector<std::string> local_file_names;
@@ -282,11 +334,12 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
     // generate test files.
     for (int i = 0; i < total_files; ++i) {
         std::string file_name = "randomfile" + std::to_string(i);
-        generate_test_file(file_name.c_str());
+        NO_FATALS(generate_test_file(file_name));
         int64_t file_size = 0;
-        dsn::utils::filesystem::file_size(file_name, file_size);
+        ASSERT_TRUE(dsn::utils::filesystem::file_size(
+            file_name, dsn::utils::FileDataType::kSensitive, file_size));
         std::string md5sum;
-        dsn::utils::filesystem::md5sum(file_name, md5sum);
+        ASSERT_EQ(ERR_OK, dsn::utils::filesystem::md5sum(file_name, md5sum));
 
         local_file_names.emplace_back(file_name);
         remote_file_names.emplace_back("hdfs_concurrent_test/" + file_name);
@@ -385,13 +438,22 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
     }
 }
 
-TEST_F(HDFSClientTest, test_rename_path_while_writing)
+TEST_P(HDFSClientTest, test_rename_path_while_writing)
 {
+    std::string kLocalTestPath = "test_dir";
+    const int kTotalFiles = 100;
+
     task_tracker tracker;
-    write_test_files_async(&tracker);
+    int success_count = 0;
+    write_test_files_async(kLocalTestPath, kTotalFiles, &success_count, 
&tracker);
     usleep(100);
-    std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms());
-    // rename succeed but writing failed.
-    ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir, 
rename_dir));
+
+    std::string kLocalRenamedTestPath = "rename_dir." + 
std::to_string(dsn_now_ms());
+    // Rename succeed but some files write failed.
+    ASSERT_TRUE(dsn::utils::filesystem::rename_path(kLocalTestPath, 
kLocalRenamedTestPath));
     tracker.cancel_outstanding_tasks();
+    // Generally, we can assume partial files are written failed.
+    // It maybe flaky, please retry if it failed.
+    ASSERT_GT(success_count, 0) << success_count;
+    ASSERT_LT(success_count, kTotalFiles) << success_count;
 }
diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt
index 7e27ef9b7..d4609d9e5 100644
--- a/src/replica/CMakeLists.txt
+++ b/src/replica/CMakeLists.txt
@@ -75,7 +75,7 @@ set(MY_PROJ_LIBS
     PocoFoundation
     PocoNetSSL
     PocoJSON
-    )
+    rocksdb)
 
 set(MY_BOOST_LIBS Boost::filesystem Boost::regex)
 
diff --git a/src/utils/env.cpp b/src/utils/env.cpp
index 329406117..7e49d999a 100644
--- a/src/utils/env.cpp
+++ b/src/utils/env.cpp
@@ -49,6 +49,12 @@ DSN_DEFINE_string(pegasus.server,
                   "The encryption method to use in the filesystem. Now "
                   "supports AES128CTR, AES192CTR, AES256CTR and SM4CTR.");
 
+DSN_DEFINE_bool(replication,
+                enable_direct_io,
+                false,
+                "Whether to enable direct I/O when download files");
+DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE);
+
 namespace dsn {
 namespace utils {
 
@@ -89,6 +95,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname,
                              uint64_t *total_size)
 {
     rocksdb::EnvOptions src_env_options;
+    src_env_options.use_direct_reads = FLAGS_enable_direct_io;
     std::unique_ptr<rocksdb::SequentialFile> src_file;
     auto s =
         dsn::utils::PegasusEnv(src_type)->NewSequentialFile(src_fname, 
&src_file, src_env_options);
@@ -105,6 +112,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname,
     }
 
     rocksdb::EnvOptions dst_env_options;
+    dst_env_options.use_direct_writes = FLAGS_enable_direct_io;
     std::unique_ptr<rocksdb::WritableFile> dst_file;
     s = dsn::utils::PegasusEnv(dst_type)->NewWritableFile(dst_fname, 
&dst_file, dst_env_options);
     LOG_AND_RETURN_NOT_RDB_OK(WARNING, s, "failed to open file {} for 
writing", dst_fname);
diff --git a/src/utils/env.h b/src/utils/env.h
index 839bd81ab..cdaf16aaa 100644
--- a/src/utils/env.h
+++ b/src/utils/env.h
@@ -19,8 +19,8 @@
 
 #include <rocksdb/env.h>
 #include <rocksdb/status.h>
-#include <stddef.h>
-#include <stdint.h>
+#include <cstddef>
+#include <cstdint>
 #include <string>
 
 namespace dsn {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to