This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f92716dd feat(aio): use rocksdb APIs to re-implement the aio module 
(#1637)
0f92716dd is described below

commit 0f92716dd3bfaec85009ffad1fe3efa541e620b5
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Oct 16 12:30:37 2023 +0800

    feat(aio): use rocksdb APIs to re-implement the aio module (#1637)
    
    https://github.com/apache/incubator-pegasus/issues/1575
    
    This is a dependent work to implement encryption at rest, we can use the 
capacity of
    rocksdb encryption after this patch.
    
    - Use rocksdb APIs to implement class `native_linux_aio_provider`. Both of 
the
      implementations are using `pread()` and `pwrite()` system calls, so there 
isn't
      significant performance changes, see the newly added simple benchmark 
performance
      comparation below.
    - Separate the file read and write operations for class `aio_provider`
---
 src/aio/CMakeLists.txt                             |   2 +-
 src/aio/aio_provider.h                             |  16 ++-
 src/aio/disk_engine.cpp                            |   7 +-
 src/aio/disk_engine.h                              |  11 +-
 src/aio/file_io.cpp                                |  48 ++++++--
 src/aio/file_io.h                                  |  10 +-
 src/aio/native_linux_aio_provider.cpp              | 132 +++++++++++----------
 src/aio/native_linux_aio_provider.h                |  19 ++-
 src/aio/test/CMakeLists.txt                        |   2 +-
 src/aio/test/aio.cpp                               |  75 ++++++++----
 src/aio/test/config.ini                            |   5 +
 src/meta/meta_state_service_simple.cpp             |   4 +-
 src/nfs/nfs_client_impl.cpp                        |   9 +-
 src/nfs/nfs_server_impl.cpp                        |   4 +-
 .../test/load_from_private_log_test.cpp            |   5 +-
 src/replica/log_file.cpp                           |   8 +-
 .../storage/simple_kv/simple_kv.server.impl.cpp    |   4 +-
 .../simple_kv/test/simple_kv.server.impl.cpp       |   4 +-
 src/replica/test/mutation_log_test.cpp             |   3 +-
 src/runtime/test/CMakeLists.txt                    |   1 +
 src/runtime/test/task_test.cpp                     |   4 +-
 src/test_util/test_util.cpp                        |   1 -
 src/test_util/test_util.h                          |  24 +++-
 src/utils/long_adder_bench/long_adder_bench.cpp    |  23 ++--
 src/utils/test/env.cpp                             |   2 +-
 25 files changed, 265 insertions(+), 158 deletions(-)

diff --git a/src/aio/CMakeLists.txt b/src/aio/CMakeLists.txt
index 45d24cf68..3754361d0 100644
--- a/src/aio/CMakeLists.txt
+++ b/src/aio/CMakeLists.txt
@@ -33,7 +33,7 @@ set(MY_PROJ_SRC "")
 #"GLOB" for non - recursive search
 set(MY_SRC_SEARCH_MODE "GLOB")
 
-set(MY_PROJ_LIBS dsn_runtime)
+set(MY_PROJ_LIBS dsn_runtime rocksdb)
 
 #Extra files that will be installed
 set(MY_BINPLACES "")
diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h
index 73848d87b..74fb410bd 100644
--- a/src/aio/aio_provider.h
+++ b/src/aio/aio_provider.h
@@ -27,10 +27,17 @@
 #pragma once
 
 #include <stdint.h>
+#include <memory>
+#include <string>
 
 #include "utils/error_code.h"
 #include "utils/factory_store.h"
 
+namespace rocksdb {
+class RandomAccessFile;
+class RandomRWFile;
+} // namespace rocksdb
+
 namespace dsn {
 
 class aio_context;
@@ -60,12 +67,13 @@ public:
     explicit aio_provider(disk_engine *disk);
     virtual ~aio_provider() = default;
 
-    virtual linux_fd_t open(const char *file_name, int flag, int pmode) = 0;
+    virtual std::unique_ptr<rocksdb::RandomAccessFile> open_read_file(const 
std::string &fname) = 0;
+    virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) = 0;
 
-    virtual error_code close(linux_fd_t fd) = 0;
-    virtual error_code flush(linux_fd_t fd) = 0;
+    virtual std::unique_ptr<rocksdb::RandomRWFile> open_write_file(const 
std::string &fname) = 0;
     virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) = 0;
-    virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) = 0;
+    virtual error_code flush(rocksdb::RandomRWFile *rwf) = 0;
+    virtual error_code close(rocksdb::RandomRWFile *rwf) = 0;
 
     // Submits the aio_task to the underlying disk-io executor.
     // This task may not be executed immediately, call `aio_task::wait`
diff --git a/src/aio/disk_engine.cpp b/src/aio/disk_engine.cpp
index 0e25bfaac..1b104be30 100644
--- a/src/aio/disk_engine.cpp
+++ b/src/aio/disk_engine.cpp
@@ -102,22 +102,26 @@ aio_task *disk_write_queue::unlink_next_workload(void 
*plength)
     return first;
 }
 
-disk_file::disk_file(linux_fd_t fd) : _fd(fd) {}
+disk_file::disk_file(std::unique_ptr<rocksdb::RandomAccessFile> rf) : 
_read_file(std::move(rf)) {}
+disk_file::disk_file(std::unique_ptr<rocksdb::RandomRWFile> wf) : 
_write_file(std::move(wf)) {}
 
 aio_task *disk_file::read(aio_task *tsk)
 {
+    CHECK(_read_file, "");
     tsk->add_ref(); // release on completion, see `on_read_completed`.
     return _read_queue.add_work(tsk, nullptr);
 }
 
 aio_task *disk_file::write(aio_task *tsk, void *ctx)
 {
+    CHECK(_write_file, "");
     tsk->add_ref(); // release on completion
     return _write_queue.add_work(tsk, ctx);
 }
 
 aio_task *disk_file::on_read_completed(aio_task *wk, error_code err, size_t 
size)
 {
+    CHECK(_read_file, "");
     CHECK(wk->next == nullptr, "");
     auto ret = _read_queue.on_work_completed(wk, nullptr);
     wk->enqueue(err, size);
@@ -128,6 +132,7 @@ aio_task *disk_file::on_read_completed(aio_task *wk, 
error_code err, size_t size
 
 aio_task *disk_file::on_write_completed(aio_task *wk, void *ctx, error_code 
err, size_t size)
 {
+    CHECK(_write_file, "");
     auto ret = _write_queue.on_work_completed(wk, ctx);
 
     while (wk) {
diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h
index 8eb784635..70cce466a 100644
--- a/src/aio/disk_engine.h
+++ b/src/aio/disk_engine.h
@@ -32,6 +32,7 @@
 
 #include "aio/aio_task.h"
 #include "aio_provider.h"
+#include "rocksdb/env.h"
 #include "utils/singleton.h"
 #include "utils/work_queue.h"
 
@@ -56,17 +57,21 @@ private:
 class disk_file
 {
 public:
-    explicit disk_file(linux_fd_t fd);
+    explicit disk_file(std::unique_ptr<rocksdb::RandomAccessFile> rf);
+    explicit disk_file(std::unique_ptr<rocksdb::RandomRWFile> wf);
     aio_task *read(aio_task *tsk);
     aio_task *write(aio_task *tsk, void *ctx);
 
     aio_task *on_read_completed(aio_task *wk, error_code err, size_t size);
     aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, 
size_t size);
 
-    linux_fd_t native_handle() const { return _fd; }
+    rocksdb::RandomAccessFile *rfile() const { return _read_file.get(); }
+    rocksdb::RandomRWFile *wfile() const { return _write_file.get(); }
 
 private:
-    linux_fd_t _fd;
+    // TODO(yingchun): unify to use a single RandomRWFile member variable.
+    std::unique_ptr<rocksdb::RandomAccessFile> _read_file;
+    std::unique_ptr<rocksdb::RandomRWFile> _write_file;
     disk_write_queue _write_queue;
     work_queue<aio_task> _read_queue;
 };
diff --git a/src/aio/file_io.cpp b/src/aio/file_io.cpp
index 4f3c10bb5..a4bf26ba8 100644
--- a/src/aio/file_io.cpp
+++ b/src/aio/file_io.cpp
@@ -26,32 +26,51 @@
 
 #include "aio/file_io.h"
 
+#include <memory>
 // IWYU pragma: no_include <algorithm>
 #include <vector>
 
 #include "aio/aio_provider.h"
 #include "disk_engine.h"
+#include "rocksdb/env.h"
+#include "utils/fmt_logging.h"
 
 namespace dsn {
 class task_tracker;
 
 namespace file {
 
-/*extern*/ disk_file *open(const char *file_name, int flag, int pmode)
+/*extern*/ disk_file *open(const std::string &fname, FileOpenType type)
 {
-    auto fd = disk_engine::provider().open(file_name, flag, pmode);
-    if (fd.is_invalid()) {
-        return nullptr;
+    switch (type) {
+    case FileOpenType::kReadOnly: {
+        auto sf = disk_engine::provider().open_read_file(fname);
+        if (!sf) {
+            return nullptr;
+        }
+        return new disk_file(std::move(sf));
     }
-
-    return new disk_file(fd);
+    case FileOpenType::kWriteOnly: {
+        auto wf = disk_engine::provider().open_write_file(fname);
+        if (!wf) {
+            return nullptr;
+        }
+        return new disk_file(std::move(wf));
+    }
+    default:
+        CHECK(false, "");
+    }
+    return nullptr;
 }
 
 /*extern*/ error_code close(disk_file *file)
 {
-    error_code result = ERR_INVALID_HANDLE;
+    error_code result = ERR_OK;
     if (file != nullptr) {
-        result = disk_engine::provider().close(file->native_handle());
+        // A read file is not needed to close.
+        if (file->wfile()) {
+            result = disk_engine::provider().close(file->wfile());
+        }
         delete file;
         file = nullptr;
     }
@@ -60,11 +79,11 @@ namespace file {
 
 /*extern*/ error_code flush(disk_file *file)
 {
-    if (nullptr != file) {
-        return disk_engine::provider().flush(file->native_handle());
-    } else {
+    if (file == nullptr || file->wfile() == nullptr) {
         return ERR_INVALID_HANDLE;
     }
+
+    return disk_engine::provider().flush(file->wfile());
 }
 
 /*extern*/ aio_task_ptr read(disk_file *file,
@@ -84,7 +103,8 @@ namespace file {
     cb->get_aio_context()->engine = &disk_engine::instance();
     cb->get_aio_context()->dfile = file;
 
-    if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true)) {
+    if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true) ||
+        file->rfile() == nullptr) {
         cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
         return cb;
     }
@@ -110,6 +130,10 @@ namespace file {
     cb->get_aio_context()->file_offset = offset;
     cb->get_aio_context()->type = AIO_Write;
     cb->get_aio_context()->dfile = file;
+    if (file->wfile() == nullptr) {
+        cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
+        return cb;
+    }
 
     disk_engine::instance().write(cb);
     return cb;
diff --git a/src/aio/file_io.h b/src/aio/file_io.h
index f0b6ffc42..0cef3f1b8 100644
--- a/src/aio/file_io.h
+++ b/src/aio/file_io.h
@@ -28,6 +28,7 @@
 
 #include <stdint.h>
 #include <list>
+#include <string>
 #include <utility>
 
 #include "aio/aio_task.h"
@@ -47,6 +48,13 @@ class task_tracker;
 
 namespace file {
 
+enum class FileOpenType
+{
+    kReadOnly = 0,
+    kWriteOnly
+};
+
+// TODO(yingchun): consider to return a smart pointer
 /// open file
 ///
 /// \param file_name filename of the file.
@@ -55,7 +63,7 @@ namespace file {
 ///
 /// \return file handle
 ///
-extern disk_file *open(const char *file_name, int flag, int pmode);
+extern disk_file *open(const std::string &fname, FileOpenType type);
 
 /// close the file handle
 extern error_code close(disk_file *file);
diff --git a/src/aio/native_linux_aio_provider.cpp 
b/src/aio/native_linux_aio_provider.cpp
index 56a0efa1e..7270dff73 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -26,19 +26,17 @@
 
 #include "native_linux_aio_provider.h"
 
-#include <errno.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <memory>
-
 #include "aio/aio_provider.h"
 #include "aio/disk_engine.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
 #include "runtime/service_engine.h"
 #include "runtime/task/async_calls.h"
+#include "utils/env.h"
 #include "utils/fmt_logging.h"
 #include "utils/latency_tracer.h"
 #include "utils/ports.h"
-#include "utils/safe_strerror_posix.h"
 
 namespace dsn {
 
@@ -46,87 +44,101 @@ 
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_pr
 
 native_linux_aio_provider::~native_linux_aio_provider() {}
 
-linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, 
int pmode)
+std::unique_ptr<rocksdb::RandomAccessFile>
+native_linux_aio_provider::open_read_file(const std::string &fname)
 {
-    auto fd = ::open(file_name, flag, pmode);
-    if (fd == DSN_INVALID_FILE_HANDLE) {
-        LOG_ERROR("create file '{}' failed, err = {}", file_name, 
utils::safe_strerror(errno));
+    std::unique_ptr<rocksdb::RandomAccessFile> rfile;
+    auto s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+                 ->NewRandomAccessFile(fname, &rfile, rocksdb::EnvOptions());
+    if (!s.ok()) {
+        LOG_ERROR("open read file '{}' failed, err = {}", fname, s.ToString());
     }
-    return linux_fd_t(fd);
+    return rfile;
 }
 
-error_code native_linux_aio_provider::close(linux_fd_t fd)
+std::unique_ptr<rocksdb::RandomRWFile>
+native_linux_aio_provider::open_write_file(const std::string &fname)
 {
-    if (fd.is_invalid() || ::close(fd.fd) == 0) {
-        return ERR_OK;
+    // rocksdb::NewRandomRWFile() doesn't act as the docs described, it will 
not create the
+    // file if it not exists, and an error Status will be returned, so we try 
to create the
+    // file by ReopenWritableFile() if it not exist.
+    auto s = 
dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)->FileExists(fname);
+    if (!s.ok() && !s.IsNotFound()) {
+        LOG_ERROR("failed to check whether the file '{}' exist, err = {}", 
fname, s.ToString());
+        return nullptr;
+    }
+
+    if (s.IsNotFound()) {
+        std::unique_ptr<rocksdb::WritableFile> cfile;
+        s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+                ->ReopenWritableFile(fname, &cfile, rocksdb::EnvOptions());
+        if (!s.ok()) {
+            LOG_ERROR("failed to create file '{}', err = {}", fname, 
s.ToString());
+            return nullptr;
+        }
     }
 
-    LOG_ERROR("close file failed, err = {}", utils::safe_strerror(errno));
-    return ERR_FILE_OPERATION_FAILED;
+    // Open the file for write as RandomRWFile, to support un-sequential write.
+    std::unique_ptr<rocksdb::RandomRWFile> wfile;
+    s = dsn::utils::PegasusEnv(dsn::utils::FileDataType::kSensitive)
+            ->NewRandomRWFile(fname, &wfile, rocksdb::EnvOptions());
+    if (!s.ok()) {
+        LOG_ERROR("open write file '{}' failed, err = {}", fname, 
s.ToString());
+    }
+    return wfile;
 }
 
-error_code native_linux_aio_provider::flush(linux_fd_t fd)
+error_code native_linux_aio_provider::close(rocksdb::RandomRWFile *wf)
 {
-    if (fd.is_invalid() || ::fsync(fd.fd) == 0) {
-        return ERR_OK;
+    auto s = wf->Close();
+    if (!s.ok()) {
+        LOG_ERROR("close file failed, err = {}", s.ToString());
+        return ERR_FILE_OPERATION_FAILED;
     }
 
-    LOG_ERROR("flush file failed, err = {}", utils::safe_strerror(errno));
-    return ERR_FILE_OPERATION_FAILED;
+    return ERR_OK;
+}
+
+error_code native_linux_aio_provider::flush(rocksdb::RandomRWFile *wf)
+{
+    auto s = wf->Fsync();
+    if (!s.ok()) {
+        LOG_ERROR("flush file failed, err = {}", s.ToString());
+        return ERR_FILE_OPERATION_FAILED;
+    }
+
+    return ERR_OK;
 }
 
 error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
                                             /*out*/ uint64_t *processed_bytes)
 {
-    dsn::error_code resp = ERR_OK;
-    uint64_t buffer_offset = 0;
-    do {
-        // ret is the written data size
-        auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd,
-                            (char *)aio_ctx.buffer + buffer_offset,
-                            aio_ctx.buffer_size - buffer_offset,
-                            aio_ctx.file_offset + buffer_offset);
-        if (dsn_unlikely(ret < 0)) {
-            if (errno == EINTR) {
-                LOG_WARNING("write failed with errno={} and will retry it.",
-                            utils::safe_strerror(errno));
-                continue;
-            }
-            resp = ERR_FILE_OPERATION_FAILED;
-            LOG_ERROR("write failed with errno={}, return {}.", 
utils::safe_strerror(errno), resp);
-            return resp;
-        }
-
-        buffer_offset += ret;
-        if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) {
-            LOG_WARNING(
-                "write incomplete, request_size={}, total_write_size={}, 
this_write_size={}, "
-                "and will retry it.",
-                aio_ctx.buffer_size,
-                buffer_offset,
-                ret);
-        }
-    } while (dsn_unlikely(buffer_offset < aio_ctx.buffer_size));
+    rocksdb::Slice data((const char *)(aio_ctx.buffer), aio_ctx.buffer_size);
+    auto s = aio_ctx.dfile->wfile()->Write(aio_ctx.file_offset, data);
+    if (!s.ok()) {
+        LOG_ERROR("write file failed, err = {}", s.ToString());
+        return ERR_FILE_OPERATION_FAILED;
+    }
 
-    *processed_bytes = buffer_offset;
-    return resp;
+    *processed_bytes = aio_ctx.buffer_size;
+    return ERR_OK;
 }
 
 error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
                                            /*out*/ uint64_t *processed_bytes)
 {
-    auto ret = ::pread(aio_ctx.dfile->native_handle().fd,
-                       aio_ctx.buffer,
-                       aio_ctx.buffer_size,
-                       aio_ctx.file_offset);
-    if (dsn_unlikely(ret < 0)) {
-        LOG_WARNING("write failed with errno={} and will retry it.", 
utils::safe_strerror(errno));
+    rocksdb::Slice result;
+    auto s = aio_ctx.dfile->rfile()->Read(
+        aio_ctx.file_offset, aio_ctx.buffer_size, &result, (char 
*)(aio_ctx.buffer));
+    if (!s.ok()) {
+        LOG_ERROR("read file failed, err = {}", s.ToString());
         return ERR_FILE_OPERATION_FAILED;
     }
-    if (ret == 0) {
+
+    if (result.empty()) {
         return ERR_HANDLE_EOF;
     }
-    *processed_bytes = static_cast<uint64_t>(ret);
+    *processed_bytes = result.size();
     return ERR_OK;
 }
 
diff --git a/src/aio/native_linux_aio_provider.h 
b/src/aio/native_linux_aio_provider.h
index bdb1339b9..538b808df 100644
--- a/src/aio/native_linux_aio_provider.h
+++ b/src/aio/native_linux_aio_provider.h
@@ -27,11 +27,18 @@
 #pragma once
 
 #include <stdint.h>
+#include <memory>
+#include <string>
 
 #include "aio/aio_task.h"
 #include "aio_provider.h"
 #include "utils/error_code.h"
 
+namespace rocksdb {
+class RandomAccessFile;
+class RandomRWFile;
+} // namespace rocksdb
+
 namespace dsn {
 class disk_engine;
 
@@ -41,16 +48,18 @@ public:
     explicit native_linux_aio_provider(disk_engine *disk);
     ~native_linux_aio_provider() override;
 
-    linux_fd_t open(const char *file_name, int flag, int pmode) override;
-    error_code close(linux_fd_t fd) override;
-    error_code flush(linux_fd_t fd) override;
-    error_code write(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) override;
+    std::unique_ptr<rocksdb::RandomAccessFile> open_read_file(const 
std::string &fname) override;
     error_code read(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) override;
 
+    std::unique_ptr<rocksdb::RandomRWFile> open_write_file(const std::string 
&fname) override;
+    error_code write(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) override;
+    error_code flush(rocksdb::RandomRWFile *wf) override;
+    error_code close(rocksdb::RandomRWFile *wf) override;
+
     void submit_aio_task(aio_task *aio) override;
     aio_context *prepare_aio_context(aio_task *tsk) override { return new 
aio_context; }
 
-protected:
+private:
     error_code aio_internal(aio_task *aio);
 };
 
diff --git a/src/aio/test/CMakeLists.txt b/src/aio/test/CMakeLists.txt
index c1b0a44e4..357499a9c 100644
--- a/src/aio/test/CMakeLists.txt
+++ b/src/aio/test/CMakeLists.txt
@@ -33,7 +33,7 @@ set(MY_PROJ_SRC "")
 # "GLOB" for non-recursive search
 set(MY_SRC_SEARCH_MODE "GLOB")
 
-set(MY_PROJ_LIBS gtest dsn_runtime dsn_aio rocksdb)
+set(MY_PROJ_LIBS gtest dsn_runtime dsn_aio test_utils rocksdb)
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)
 
diff --git a/src/aio/test/aio.cpp b/src/aio/test/aio.cpp
index a95cbeb8f..fa6b0114a 100644
--- a/src/aio/test/aio.cpp
+++ b/src/aio/test/aio.cpp
@@ -24,7 +24,7 @@
  * THE SOFTWARE.
  */
 
-#include <fcntl.h>
+#include <fmt/core.h>
 // IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
@@ -48,11 +48,25 @@
 #include "utils/env.h"
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 #include "utils/test_macros.h"
 #include "utils/threadpool_code.h"
 
+DSN_DEFINE_uint32(aio_test,
+                  op_buffer_size,
+                  12,
+                  "The buffer size of each aio read or write operation for the 
aio_test.basic");
+DSN_DEFINE_uint32(aio_test,
+                  total_op_count,
+                  100,
+                  "The total count of read or write operations for the 
aio_test.basic");
+DSN_DEFINE_uint32(
+    aio_test,
+    op_count_per_batch,
+    10,
+    "The operation count of per read or write batch operation for the 
aio_test.basic");
+
 using namespace ::dsn;
 
 DEFINE_THREAD_POOL_CODE(THREAD_POOL_TEST_SERVER)
@@ -62,6 +76,7 @@ class aio_test : public pegasus::encrypt_data_test_base
 {
 public:
     void SetUp() override { utils::filesystem::remove_path(kTestFileName); }
+    void TearDown() override { utils::filesystem::remove_path(kTestFileName); }
 
     const std::string kTestFileName = "aio_test.txt";
 };
@@ -71,10 +86,10 @@ INSTANTIATE_TEST_CASE_P(, aio_test, 
::testing::Values(false));
 
 TEST_P(aio_test, basic)
 {
-    const char *kUnitBuffer = "hello, world";
-    const size_t kUnitBufferLength = strlen(kUnitBuffer);
-    const int kTotalBufferCount = 100;
-    const int kBufferCountPerBatch = 10;
+    const size_t kUnitBufferLength = FLAGS_op_buffer_size;
+    const std::string kUnitBuffer(kUnitBufferLength, 'x');
+    const int kTotalBufferCount = FLAGS_total_op_count;
+    const int kBufferCountPerBatch = FLAGS_op_count_per_batch;
     const int64_t kFileSize = kUnitBufferLength * kTotalBufferCount;
     ASSERT_EQ(0, kTotalBufferCount % kBufferCountPerBatch);
 
@@ -86,15 +101,16 @@ TEST_P(aio_test, basic)
     auto verify_data = [=]() {
         int64_t file_size;
         ASSERT_TRUE(utils::filesystem::file_size(
-            kTestFileName.c_str(), dsn::utils::FileDataType::kSensitive, 
file_size));
+            kTestFileName, dsn::utils::FileDataType::kSensitive, file_size));
         ASSERT_EQ(kFileSize, file_size);
 
         // Create a read file handler.
-        auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+        auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly);
         ASSERT_NE(rfile, nullptr);
 
         // 1. Check sequential read.
         {
+            pegasus::stop_watch sw;
             uint64_t offset = 0;
             std::list<aio_task_ptr> tasks;
             for (int i = 0; i < kTotalBufferCount; i++) {
@@ -111,12 +127,14 @@ TEST_P(aio_test, basic)
 
                 t->wait();
                 ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
-                ASSERT_STREQ(kUnitBuffer, read_buffer);
+                ASSERT_STREQ(kUnitBuffer.c_str(), read_buffer);
             }
+            sw.stop_and_output(fmt::format("sequential read"));
         }
 
         // 2. Check concurrent read.
         {
+            pegasus::stop_watch sw;
             uint64_t offset = 0;
             std::list<aio_task_ptr> tasks;
             char read_buffers[kTotalBufferCount][kUnitBufferLength + 1];
@@ -137,22 +155,24 @@ TEST_P(aio_test, basic)
                 ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
             }
             for (int i = 0; i < kTotalBufferCount; i++) {
-                ASSERT_STREQ(kUnitBuffer, read_buffers[i]);
+                ASSERT_STREQ(kUnitBuffer.c_str(), read_buffers[i]);
             }
+            sw.stop_and_output(fmt::format("concurrent read"));
         }
         ASSERT_EQ(ERR_OK, file::close(rfile));
     };
 
     // 1. Sequential write.
     {
-        auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | 
O_BINARY, 0666);
+        pegasus::stop_watch sw;
+        auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
         ASSERT_NE(wfile, nullptr);
 
         uint64_t offset = 0;
         std::list<aio_task_ptr> tasks;
         for (int i = 0; i < kTotalBufferCount; i++) {
             auto t = ::dsn::file::write(wfile,
-                                        kUnitBuffer,
+                                        kUnitBuffer.c_str(),
                                         kUnitBufferLength,
                                         offset,
                                         LPC_AIO_TEST,
@@ -167,12 +187,14 @@ TEST_P(aio_test, basic)
         }
         ASSERT_EQ(ERR_OK, file::flush(wfile));
         ASSERT_EQ(ERR_OK, file::close(wfile));
+        sw.stop_and_output(fmt::format("sequential write"));
     }
     NO_FATALS(verify_data());
 
     // 2. Un-sequential write.
     {
-        auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | 
O_BINARY, 0666);
+        pegasus::stop_watch sw;
+        auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
         ASSERT_NE(wfile, nullptr);
 
         std::vector<uint64_t> offsets;
@@ -188,7 +210,7 @@ TEST_P(aio_test, basic)
         std::list<aio_task_ptr> tasks;
         for (const auto &offset : offsets) {
             auto t = ::dsn::file::write(wfile,
-                                        kUnitBuffer,
+                                        kUnitBuffer.c_str(),
                                         kUnitBufferLength,
                                         offset,
                                         LPC_AIO_TEST,
@@ -202,19 +224,21 @@ TEST_P(aio_test, basic)
         }
         ASSERT_EQ(ERR_OK, file::flush(wfile));
         ASSERT_EQ(ERR_OK, file::close(wfile));
+        sw.stop_and_output(fmt::format("un-sequential write"));
     }
     NO_FATALS(verify_data());
 
     // 3. Overwrite.
     {
-        auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | 
O_BINARY, 0666);
+        pegasus::stop_watch sw;
+        auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
         ASSERT_NE(wfile, nullptr);
 
         uint64_t offset = 0;
         std::list<aio_task_ptr> tasks;
         for (int i = 0; i < kTotalBufferCount; i++) {
             auto t = ::dsn::file::write(wfile,
-                                        kUnitBuffer,
+                                        kUnitBuffer.c_str(),
                                         kUnitBufferLength,
                                         offset,
                                         LPC_AIO_TEST,
@@ -229,19 +253,21 @@ TEST_P(aio_test, basic)
         }
         ASSERT_EQ(ERR_OK, file::flush(wfile));
         ASSERT_EQ(ERR_OK, file::close(wfile));
+        sw.stop_and_output(fmt::format("overwrite"));
     }
     NO_FATALS(verify_data());
 
     // 4. Vector write.
     {
-        auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT | 
O_BINARY, 0666);
+        pegasus::stop_watch sw;
+        auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
         ASSERT_NE(wfile, nullptr);
 
         uint64_t offset = 0;
         std::list<aio_task_ptr> tasks;
         std::unique_ptr<dsn_file_buffer_t[]> buffers(new 
dsn_file_buffer_t[kBufferCountPerBatch]);
         for (int i = 0; i < kBufferCountPerBatch; i++) {
-            buffers[i].buffer = static_cast<void *>(const_cast<char 
*>(kUnitBuffer));
+            buffers[i].buffer = static_cast<void *>(const_cast<char 
*>(kUnitBuffer.c_str()));
             buffers[i].size = kUnitBufferLength;
         }
         for (int i = 0; i < kTotalBufferCount / kBufferCountPerBatch; i++) {
@@ -264,16 +290,17 @@ TEST_P(aio_test, basic)
         }
         ASSERT_EQ(ERR_OK, file::flush(wfile));
         ASSERT_EQ(ERR_OK, file::close(wfile));
+        sw.stop_and_output(fmt::format("vector write"));
     }
     NO_FATALS(verify_data());
 }
 
 TEST_P(aio_test, aio_share)
 {
-    auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT | 
O_BINARY, 0666);
+    auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
     ASSERT_NE(wfile, nullptr);
 
-    auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+    auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly);
     ASSERT_NE(rfile, nullptr);
 
     ASSERT_EQ(ERR_OK, file::close(wfile));
@@ -289,7 +316,7 @@ TEST_P(aio_test, operation_failed)
         *count = n;
     };
 
-    auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT | 
O_BINARY, 0666);
+    auto wfile = file::open(kTestFileName, file::FileOpenType::kWriteOnly);
     ASSERT_NE(wfile, nullptr);
 
     char buff[512] = {0};
@@ -305,7 +332,7 @@ TEST_P(aio_test, operation_failed)
     t->wait();
     ASSERT_EQ(ERR_FILE_OPERATION_FAILED, *err);
 
-    auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+    auto rfile = file::open(kTestFileName, file::FileOpenType::kReadOnly);
     ASSERT_NE(nullptr, rfile);
 
     t = ::dsn::file::read(rfile, buff, 512, 0, LPC_AIO_TEST, nullptr, 
io_callback, 0);
@@ -357,9 +384,9 @@ TEST_P(aio_test, dsn_file)
     ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(src_file, src_file_md5));
     ASSERT_FALSE(src_file_md5.empty());
 
-    auto fin = file::open(src_file.c_str(), O_RDONLY | O_BINARY, 0);
+    auto fin = file::open(src_file, file::FileOpenType::kReadOnly);
     ASSERT_NE(nullptr, fin);
-    auto fout = file::open(dst_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
+    auto fout = file::open(dst_file, file::FileOpenType::kWriteOnly);
     ASSERT_NE(nullptr, fout);
     char kUnitBuffer[1024];
     uint64_t offset = 0;
diff --git a/src/aio/test/config.ini b/src/aio/test/config.ini
index 47bc9cf7f..fd46a38d7 100644
--- a/src/aio/test/config.ini
+++ b/src/aio/test/config.ini
@@ -43,3 +43,8 @@ tool = nativerun
 pause_on_start = false
 logging_start_level = LOG_LEVEL_DEBUG
 logging_factory_name = dsn::tools::simple_logger
+
+[aio_test]
+op_buffer_size = 12
+total_op_count = 100
+op_count_per_batch = 10
diff --git a/src/meta/meta_state_service_simple.cpp 
b/src/meta/meta_state_service_simple.cpp
index 6ba00176e..776cfba34 100644
--- a/src/meta/meta_state_service_simple.cpp
+++ b/src/meta/meta_state_service_simple.cpp
@@ -26,7 +26,6 @@
 
 #include "meta_state_service_simple.h"
 
-#include <fcntl.h>
 #include <string.h>
 #include <algorithm>
 #include <set>
@@ -44,7 +43,6 @@
 #include "utils/binary_reader.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 #include "utils/strings.h"
 #include "utils/utils.h"
 
@@ -314,7 +312,7 @@ error_code meta_state_service_simple::initialize(const 
std::vector<std::string>
         }
     }
 
-    _log = file::open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
+    _log = file::open(log_path, file::FileOpenType::kWriteOnly);
     if (!_log) {
         LOG_ERROR("open file failed: {}", log_path);
         return ERR_FILE_OPERATION_FAILED;
diff --git a/src/nfs/nfs_client_impl.cpp b/src/nfs/nfs_client_impl.cpp
index c0ced7b8c..910bae8ec 100644
--- a/src/nfs/nfs_client_impl.cpp
+++ b/src/nfs/nfs_client_impl.cpp
@@ -27,7 +27,6 @@
 #include "nfs_client_impl.h"
 
 // IWYU pragma: no_include <ext/alloc_traits.h>
-#include <fcntl.h>
 #include <mutex>
 
 #include "nfs/nfs_code_definition.h"
@@ -38,7 +37,6 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 #include "utils/string_conv.h"
 #include "utils/token_buckets.h"
 
@@ -460,8 +458,7 @@ void nfs_client_impl::continue_write()
         // double check
         zauto_lock l(fc->user_req->user_req_lock);
         if (!fc->file_holder->file_handle) {
-            fc->file_holder->file_handle =
-                file::open(file_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 
0666);
+            fc->file_holder->file_handle = file::open(file_path, 
file::FileOpenType::kWriteOnly);
         }
     }
 
@@ -470,6 +467,10 @@ void nfs_client_impl::continue_write()
         LOG_ERROR("open file {} failed", file_path);
         handle_completion(fc->user_req, ERR_FILE_OPERATION_FAILED);
     } else {
+        LOG_DEBUG("nfs: copy to file {} [{}, {}]",
+                  file_path,
+                  reqc->response.offset,
+                  reqc->response.offset + reqc->response.size);
         zauto_lock l(reqc->lock);
         if (reqc->is_valid) {
             reqc->local_write_task = file::write(fc->file_holder->file_handle,
diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp
index 08821042a..c4bcf7a2e 100644
--- a/src/nfs/nfs_server_impl.cpp
+++ b/src/nfs/nfs_server_impl.cpp
@@ -26,7 +26,6 @@
 
 #include "nfs/nfs_server_impl.h"
 
-#include <fcntl.h>
 #include <chrono>
 #include <cstdint>
 #include <mutex>
@@ -41,7 +40,6 @@
 #include "utils/env.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
-#include "utils/ports.h"
 #include "utils/string_conv.h"
 #include "utils/utils.h"
 
@@ -93,7 +91,7 @@ void nfs_service_impl::on_copy(const 
::dsn::service::copy_request &request,
         zauto_lock l(_handles_map_lock);
         auto it = _handles_map.find(file_path); // find file handle cache first
         if (it == _handles_map.end()) {
-            dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
+            dfile = file::open(file_path, file::FileOpenType::kReadOnly);
             if (dfile == nullptr) {
                 LOG_ERROR("[nfs_service] open file {} failed", file_path);
                 ::dsn::service::copy_response resp;
diff --git a/src/replica/duplication/test/load_from_private_log_test.cpp 
b/src/replica/duplication/test/load_from_private_log_test.cpp
index 2782f55f1..b215be60f 100644
--- a/src/replica/duplication/test/load_from_private_log_test.cpp
+++ b/src/replica/duplication/test/load_from_private_log_test.cpp
@@ -16,8 +16,8 @@
 // under the License.
 
 // IWYU pragma: no_include <ext/alloc_traits.h>
-#include <fcntl.h>
 #include <fmt/core.h>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
@@ -54,7 +54,6 @@
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 
 #define BOOST_NO_CXX11_SCOPED_ENUMS
 #include <boost/filesystem/operations.hpp>
@@ -459,7 +458,7 @@ TEST_F(load_fail_mode_test, fail_skip_real_corrupted_file)
         int64_t file_size;
         ASSERT_TRUE(utils::filesystem::file_size(
             log_path, dsn::utils::FileDataType::kSensitive, file_size));
-        auto wfile = file::open(log_path.c_str(), O_RDWR | O_CREAT | O_BINARY, 
0666);
+        auto wfile = file::open(log_path, file::FileOpenType::kWriteOnly);
         ASSERT_NE(wfile, nullptr);
 
         const char buf[] = "xxxxxx";
diff --git a/src/replica/log_file.cpp b/src/replica/log_file.cpp
index e239d4dae..973213b3a 100644
--- a/src/replica/log_file.cpp
+++ b/src/replica/log_file.cpp
@@ -26,7 +26,6 @@
 
 #include "log_file.h"
 
-#include <fcntl.h>
 #include <inttypes.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -99,7 +98,7 @@ log_file::~log_file() { close(); }
         return nullptr;
     }
 
-    disk_file *hfile = file::open(path, O_RDONLY | O_BINARY, 0);
+    disk_file *hfile = file::open(path, file::FileOpenType::kReadOnly);
     if (!hfile) {
         err = ERR_FILE_OPERATION_FAILED;
         LOG_WARNING("open log file {} failed", path);
@@ -155,7 +154,7 @@ log_file::~log_file() { close(); }
         return nullptr;
     }
 
-    disk_file *hfile = file::open(path, O_RDWR | O_CREAT | O_BINARY, 0666);
+    disk_file *hfile = file::open(path, file::FileOpenType::kWriteOnly);
     if (!hfile) {
         LOG_WARNING("create log {} failed", path);
         return nullptr;
@@ -268,6 +267,7 @@ aio_task_ptr log_file::commit_log_block(log_block &block,
     log_appender pending(offset, block);
     return commit_log_blocks(pending, evt, tracker, std::move(callback), hash);
 }
+
 aio_task_ptr log_file::commit_log_blocks(log_appender &pending,
                                          dsn::task_code evt,
                                          dsn::task_tracker *tracker,
@@ -333,7 +333,7 @@ aio_task_ptr log_file::commit_log_blocks(log_appender 
&pending,
                                  hash);
     }
 
-    if (utils::FLAGS_enable_latency_tracer) {
+    if (dsn_unlikely(utils::FLAGS_enable_latency_tracer)) {
         tsk->_tracer->set_parent_point_name("commit_pending_mutations");
         tsk->_tracer->set_description("log");
         for (const auto &mutation : pending.mutations()) {
diff --git a/src/replica/storage/simple_kv/simple_kv.server.impl.cpp 
b/src/replica/storage/simple_kv/simple_kv.server.impl.cpp
index 81a718a45..e78ae81ef 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.impl.cpp
+++ b/src/replica/storage/simple_kv/simple_kv.server.impl.cpp
@@ -35,7 +35,6 @@
 
 #include "simple_kv.server.impl.h"
 
-#include <fcntl.h>
 #include <fmt/core.h>
 #include <inttypes.h>
 #include <rocksdb/slice.h>
@@ -61,7 +60,6 @@
 #include "utils/blob.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 #include "utils/utils.h"
 
 namespace dsn {
@@ -249,7 +247,7 @@ void simple_kv_service_impl::recover(const std::string 
&name, int64_t version)
         return ERR_OK;
     }
 
-    auto wfile = file::open(fname.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
+    auto wfile = file::open(fname, file::FileOpenType::kWriteOnly);
     CHECK_NOTNULL(wfile, "");
 
 #define WRITE_DATA_SIZE(data, size)                                            
                    \
diff --git a/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp 
b/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp
index ae04a7f4e..021496194 100644
--- a/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp
+++ b/src/replica/storage/simple_kv/test/simple_kv.server.impl.cpp
@@ -25,7 +25,6 @@
 */
 #include "simple_kv.server.impl.h"
 
-#include <fcntl.h>
 #include <fmt/core.h>
 #include <inttypes.h>
 #include <stdio.h>
@@ -49,7 +48,6 @@
 #include "utils/blob.h"
 #include "utils/filesystem.h"
 #include "utils/fmt_logging.h"
-#include "utils/ports.h"
 #include "utils/threadpool_code.h"
 #include "utils/utils.h"
 
@@ -254,7 +252,7 @@ void simple_kv_service_impl::recover(const std::string 
&name, int64_t version)
     }
 
     std::string fname = fmt::format("{}/checkpoint.{}", data_dir(), 
last_commit);
-    auto wfile = file::open(fname.c_str(), O_RDWR | O_CREAT | O_BINARY, 0666);
+    auto wfile = file::open(fname, file::FileOpenType::kWriteOnly);
     CHECK_NOTNULL(wfile, "");
 
 #define WRITE_DATA_SIZE(data, size)                                            
                    \
diff --git a/src/replica/test/mutation_log_test.cpp 
b/src/replica/test/mutation_log_test.cpp
index 980a538bf..4f96d7d57 100644
--- a/src/replica/test/mutation_log_test.cpp
+++ b/src/replica/test/mutation_log_test.cpp
@@ -26,7 +26,6 @@
 
 #include "replica/mutation_log.h"
 
-#include <fcntl.h>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-param-test.h>
@@ -70,7 +69,7 @@ using namespace ::dsn::replication;
 
 static void overwrite_file(const char *file, int offset, const void *buf, int 
size)
 {
-    auto wfile = file::open(file, O_RDWR | O_CREAT | O_BINARY, 0666);
+    auto wfile = file::open(file, file::FileOpenType::kWriteOnly);
     ASSERT_NE(wfile, nullptr);
     auto t = ::dsn::file::write(wfile,
                                 (const char *)buf,
diff --git a/src/runtime/test/CMakeLists.txt b/src/runtime/test/CMakeLists.txt
index c0146eb73..01f17e8c5 100644
--- a/src/runtime/test/CMakeLists.txt
+++ b/src/runtime/test/CMakeLists.txt
@@ -33,6 +33,7 @@ set(MY_PROJ_LIBS gtest
                  dsn_runtime
                  dsn_aio
                  dsn_meta_server
+                 rocksdb
                  )
 
 set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex)
diff --git a/src/runtime/test/task_test.cpp b/src/runtime/test/task_test.cpp
index ba4f3d887..5716c8ce4 100644
--- a/src/runtime/test/task_test.cpp
+++ b/src/runtime/test/task_test.cpp
@@ -17,7 +17,6 @@
 
 #include "runtime/task/task.h"
 
-#include <fcntl.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
 #include <gtest/gtest.h>
@@ -66,7 +65,7 @@ public:
 
     static void test_signal_finished_task()
     {
-        disk_file *fp = file::open("config-test.ini", O_RDONLY | O_BINARY, 0);
+        disk_file *fp = file::open("config-test.ini", 
file::FileOpenType::kReadOnly);
 
         // this aio task is enqueued into read-queue of disk_engine
         char buffer[128];
@@ -80,6 +79,7 @@ public:
         // signal a finished task won't cause failure
         t->signal_waiters(); // signal_waiters may return false
         t->signal_waiters();
+        ASSERT_EQ(ERR_OK, file::close(fp));
     }
 };
 
diff --git a/src/test_util/test_util.cpp b/src/test_util/test_util.cpp
index 0789c4678..935d50631 100644
--- a/src/test_util/test_util.cpp
+++ b/src/test_util/test_util.cpp
@@ -18,7 +18,6 @@
 #include "test_util.h"
 
 #include <gtest/gtest-spi.h>
-#include <stdint.h>
 #include <chrono>
 #include <ostream>
 #include <thread>
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index debeb7fa9..23b1624be 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -19,10 +19,15 @@
 
 #pragma once
 
-#include <gtest/gtest.h>
+#include <chrono>
+#include <cstdint>
+#include <cstdio>
 #include <functional>
+#include <gtest/gtest.h>
 #include <string>
 
+#include "fmt/core.h"
+#include "runtime/api_layer1.h"
 #include "utils/flags.h"
 #include "utils/test_macros.h"
 
@@ -43,6 +48,23 @@ public:
     encrypt_data_test_base() { FLAGS_encrypt_data_at_rest = GetParam(); }
 };
 
+class stop_watch
+{
+public:
+    stop_watch() { _start_ms = dsn_now_ms(); }
+    void stop_and_output(const std::string &msg)
+    {
+        auto duration_ms =
+            std::chrono::duration_cast<std::chrono::duration<double>>(
+                std::chrono::milliseconds(static_cast<int64_t>(dsn_now_ms() - 
_start_ms)))
+                .count();
+        fmt::print(stdout, "{}, cost {} ms\n", msg, duration_ms);
+    }
+
+private:
+    uint64_t _start_ms = 0;
+};
+
 void create_local_test_file(const std::string &full_name, 
dsn::replication::file_meta *fm);
 
 #define ASSERT_EVENTUALLY(expr)                                                
                    \
diff --git a/src/utils/long_adder_bench/long_adder_bench.cpp 
b/src/utils/long_adder_bench/long_adder_bench.cpp
index 93ec649c7..0c2a12dee 100644
--- a/src/utils/long_adder_bench/long_adder_bench.cpp
+++ b/src/utils/long_adder_bench/long_adder_bench.cpp
@@ -20,12 +20,11 @@
 #include <stdio.h>
 #include <algorithm>
 #include <atomic>
-#include <chrono>
 #include <cstdlib>
 #include <thread>
 #include <vector>
 
-#include "runtime/api_layer1.h"
+#include "test_util/test_util.h"
 #include "utils/long_adder.h"
 #include "utils/ports.h"
 #include "utils/process_utils.h"
@@ -133,7 +132,7 @@ void run_bench(int64_t num_operations, int64_t num_threads, 
const char *name)
 
     std::vector<std::thread> threads;
 
-    uint64_t start = dsn_now_ns();
+    pegasus::stop_watch sw;
     for (int64_t i = 0; i < num_threads; i++) {
         threads.emplace_back([num_operations, &adder]() {
             for (int64_t i = 0; i < num_operations; ++i) {
@@ -144,19 +143,11 @@ void run_bench(int64_t num_operations, int64_t 
num_threads, const char *name)
     for (auto &t : threads) {
         t.join();
     }
-    uint64_t end = dsn_now_ns();
-
-    auto duration_ns = static_cast<int64_t>(end - start);
-    std::chrono::nanoseconds nano(duration_ns);
-    auto duration_s = 
std::chrono::duration_cast<std::chrono::duration<double>>(nano).count();
-
-    fmt::print(stdout,
-               "Running {} operations of {} with {} threads took {} seconds, 
result = {}.\n",
-               num_operations,
-               name,
-               num_threads,
-               duration_s,
-               adder.value());
+    sw.stop_and_output(fmt::format("Running {} operations of {} with {} 
threads, result = {}",
+                                   num_operations,
+                                   name,
+                                   num_threads,
+                                   adder.value()));
 }
 
 int main(int argc, char **argv)
diff --git a/src/utils/test/env.cpp b/src/utils/test/env.cpp
index 813465de2..49e389729 100644
--- a/src/utils/test/env.cpp
+++ b/src/utils/test/env.cpp
@@ -47,8 +47,8 @@
 #include <string>
 
 #include "test_util/test_util.h"
-#include "utils/enum_helper.h"
 #include "utils/env.h"
+#include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/rand.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to