This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new e7c0589c4 refactor: use linux_fd_t instead of dsn_handle_t for file 
descriptor (#1238)
e7c0589c4 is described below

commit e7c0589c48b47a34fbf124f5c5dc7148bad0618e
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Nov 15 11:26:14 2022 +0800

    refactor: use linux_fd_t instead of dsn_handle_t for file descriptor (#1238)
---
 src/aio/aio_provider.h                | 17 ++++++----
 src/aio/aio_task.h                    | 13 ++++----
 src/aio/disk_engine.cpp               | 22 +++++--------
 src/aio/disk_engine.h                 |  7 ++--
 src/aio/file_io.cpp                   | 15 ++++-----
 src/aio/native_linux_aio_provider.cpp | 60 +++++++++++++++++++----------------
 src/aio/native_linux_aio_provider.h   |  6 ++--
 src/nfs/nfs_server_impl.cpp           | 25 +++++++--------
 src/nfs/nfs_server_impl.h             | 10 ++----
 9 files changed, 83 insertions(+), 92 deletions(-)

diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h
index 12fd56656..261560d3f 100644
--- a/src/aio/aio_provider.h
+++ b/src/aio/aio_provider.h
@@ -37,7 +37,14 @@ class service_node;
 class task_worker_pool;
 class task_queue;
 
-#define DSN_INVALID_FILE_HANDLE ((dsn_handle_t)(uintptr_t)-1)
+#define DSN_INVALID_FILE_HANDLE -1
+struct linux_fd_t
+{
+    int fd;
+
+    explicit linux_fd_t(int f) : fd(f) {}
+    inline bool is_invalid() const { return fd == DSN_INVALID_FILE_HANDLE; }
+};
 
 class aio_provider
 {
@@ -53,12 +60,10 @@ public:
     explicit aio_provider(disk_engine *disk);
     virtual ~aio_provider() = default;
 
-    // return DSN_INVALID_FILE_HANDLE if failed
-    // TODO(wutao1): return uint64_t instead (because we only support linux 
now)
-    virtual dsn_handle_t open(const char *file_name, int flag, int pmode) = 0;
+    virtual linux_fd_t open(const char *file_name, int flag, int pmode) = 0;
 
-    virtual error_code close(dsn_handle_t fh) = 0;
-    virtual error_code flush(dsn_handle_t fh) = 0;
+    virtual error_code close(linux_fd_t fd) = 0;
+    virtual error_code flush(linux_fd_t fd) = 0;
     virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) = 0;
     virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) = 0;
 
diff --git a/src/aio/aio_task.h b/src/aio/aio_task.h
index 1d278fd61..ea3b1ce0d 100644
--- a/src/aio/aio_task.h
+++ b/src/aio/aio_task.h
@@ -26,10 +26,10 @@
 
 #pragma once
 
-#include "runtime/task/task.h"
-
 #include <vector>
 
+#include "runtime/task/task.h"
+
 namespace dsn {
 
 namespace utils {
@@ -50,11 +50,11 @@ typedef struct
 } dsn_file_buffer_t;
 
 class disk_engine;
+class disk_file;
 class aio_context : public ref_counter
 {
 public:
     // filled by apps
-    dsn_handle_t file;
     void *buffer;
     uint64_t buffer_size;
     uint64_t file_offset;
@@ -62,16 +62,15 @@ public:
     // filled by frameworks
     aio_type type;
     disk_engine *engine;
-    void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it 
from `file`
+    disk_file *dfile;
 
     aio_context()
-        : file(nullptr),
-          buffer(nullptr),
+        : buffer(nullptr),
           buffer_size(0),
           file_offset(0),
           type(AIO_Invalid),
           engine(nullptr),
-          file_object(nullptr)
+          dfile(nullptr)
     {
     }
 };
diff --git a/src/aio/disk_engine.cpp b/src/aio/disk_engine.cpp
index 9fef5d58d..e69f5c982 100644
--- a/src/aio/disk_engine.cpp
+++ b/src/aio/disk_engine.cpp
@@ -91,7 +91,7 @@ aio_task *disk_write_queue::unlink_next_workload(void 
*plength)
     return first;
 }
 
-disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {}
+disk_file::disk_file(linux_fd_t fd) : _fd(fd) {}
 
 aio_task *disk_file::read(aio_task *tsk)
 {
@@ -162,10 +162,9 @@ public:
 
     virtual void exec() override
     {
-        auto df = (disk_file *)_tasks->get_aio_context()->file_object;
+        auto dfile = _tasks->get_aio_context()->dfile;
         uint64_t sz;
-
-        auto wk = df->on_write_completed(_tasks, (void *)&sz, error(), 
get_transferred_size());
+        auto wk = dfile->on_write_completed(_tasks, (void *)&sz, error(), 
get_transferred_size());
         if (wk) {
             wk->get_aio_context()->engine->process_write(wk, sz);
         }
@@ -183,14 +182,10 @@ void disk_engine::write(aio_task *aio)
     }
 
     auto dio = aio->get_aio_context();
-    auto df = (disk_file *)dio->file;
-    dio->file = df->native_handle();
-    dio->file_object = df;
     dio->engine = this;
-    dio->type = AIO_Write;
 
     uint64_t sz;
-    auto wk = df->write(aio, &sz);
+    auto wk = dio->dfile->write(aio, &sz);
     if (wk) {
         process_write(wk, sz);
     }
@@ -213,8 +208,7 @@ void disk_engine::process_write(aio_task *aio, uint64_t sz)
         auto new_dio = new_task->get_aio_context();
         new_dio->buffer_size = sz;
         new_dio->file_offset = dio->file_offset;
-        new_dio->file = dio->file;
-        new_dio->file_object = dio->file_object;
+        new_dio->dfile = dio->dfile;
         new_dio->engine = dio->engine;
         new_dio->type = AIO_Write;
 
@@ -256,9 +250,9 @@ void disk_engine::complete_io(aio_task *aio, error_code 
err, uint64_t bytes)
 
     // no batching
     else {
-        auto df = (disk_file *)(aio->get_aio_context()->file_object);
+        auto dfile = aio->get_aio_context()->dfile;
         if (aio->get_aio_context()->type == AIO_Read) {
-            auto wk = df->on_read_completed(aio, err, (size_t)bytes);
+            auto wk = dfile->on_read_completed(aio, err, (size_t)bytes);
             if (wk) {
                 _provider->submit_aio_task(wk);
             }
@@ -267,7 +261,7 @@ void disk_engine::complete_io(aio_task *aio, error_code 
err, uint64_t bytes)
         // write
         else {
             uint64_t sz;
-            auto wk = df->on_write_completed(aio, (void *)&sz, err, 
(size_t)bytes);
+            auto wk = dfile->on_write_completed(aio, (void *)&sz, err, 
(size_t)bytes);
             if (wk) {
                 process_write(wk, sz);
             }
diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h
index 4d821b6ea..8bf3e68ab 100644
--- a/src/aio/disk_engine.h
+++ b/src/aio/disk_engine.h
@@ -52,18 +52,17 @@ private:
 class disk_file
 {
 public:
-    disk_file(dsn_handle_t handle);
+    explicit disk_file(linux_fd_t fd);
     aio_task *read(aio_task *tsk);
     aio_task *write(aio_task *tsk, void *ctx);
 
     aio_task *on_read_completed(aio_task *wk, error_code err, size_t size);
     aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err, 
size_t size);
 
-    // TODO(wutao1): make it uint64_t
-    dsn_handle_t native_handle() const { return _handle; }
+    linux_fd_t native_handle() const { return _fd; }
 
 private:
-    dsn_handle_t _handle;
+    linux_fd_t _fd;
     disk_write_queue _write_queue;
     work_queue<aio_task> _read_queue;
 };
diff --git a/src/aio/file_io.cpp b/src/aio/file_io.cpp
index e6e7c763f..39429ea7d 100644
--- a/src/aio/file_io.cpp
+++ b/src/aio/file_io.cpp
@@ -32,12 +32,12 @@ namespace file {
 
 /*extern*/ disk_file *open(const char *file_name, int flag, int pmode)
 {
-    dsn_handle_t nh = disk_engine::provider().open(file_name, flag, pmode);
-    if (nh != DSN_INVALID_FILE_HANDLE) {
-        return new disk_file(nh);
-    } else {
+    auto fd = disk_engine::provider().open(file_name, flag, pmode);
+    if (fd.is_invalid()) {
         return nullptr;
     }
+
+    return new disk_file(fd);
 }
 
 /*extern*/ error_code close(disk_file *file)
@@ -72,11 +72,10 @@ namespace file {
     auto cb = create_aio_task(callback_code, tracker, std::move(callback), 
hash);
     cb->get_aio_context()->buffer = buffer;
     cb->get_aio_context()->buffer_size = count;
-    cb->get_aio_context()->file_object = file;
-    cb->get_aio_context()->file = file->native_handle();
     cb->get_aio_context()->file_offset = offset;
     cb->get_aio_context()->type = AIO_Read;
     cb->get_aio_context()->engine = &disk_engine::instance();
+    cb->get_aio_context()->dfile = file;
 
     if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true)) {
         cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
@@ -101,9 +100,9 @@ namespace file {
     auto cb = create_aio_task(callback_code, tracker, std::move(callback), 
hash);
     cb->get_aio_context()->buffer = (char *)buffer;
     cb->get_aio_context()->buffer_size = count;
-    cb->get_aio_context()->file = file;
     cb->get_aio_context()->file_offset = offset;
     cb->get_aio_context()->type = AIO_Write;
+    cb->get_aio_context()->dfile = file;
 
     disk_engine::instance().write(cb);
     return cb;
@@ -119,9 +118,9 @@ namespace file {
                                      int hash /*= 0*/)
 {
     auto cb = create_aio_task(callback_code, tracker, std::move(callback), 
hash);
-    cb->get_aio_context()->file = file;
     cb->get_aio_context()->file_offset = offset;
     cb->get_aio_context()->type = AIO_Write;
+    cb->get_aio_context()->dfile = file;
     for (int i = 0; i < buffer_count; i++) {
         if (buffers[i].size > 0) {
             cb->_unmerged_write_buffers.push_back(buffers[i]);
diff --git a/src/aio/native_linux_aio_provider.cpp 
b/src/aio/native_linux_aio_provider.cpp
index 3ff8dda22..3ca601c76 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -28,13 +28,14 @@
 
 #include <fcntl.h>
 
+#include "aio/disk_engine.h"
 #include "runtime/service_engine.h"
-
 #include "runtime/task/async_calls.h"
 #include "utils/api_utilities.h"
-#include "utils/fmt_logging.h"
 #include "utils/fail_point.h"
+#include "utils/fmt_logging.h"
 #include "utils/latency_tracer.h"
+#include "utils/safe_strerror_posix.h"
 
 namespace dsn {
 
@@ -42,33 +43,33 @@ 
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_pr
 
 native_linux_aio_provider::~native_linux_aio_provider() {}
 
-dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag, 
int pmode)
+linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag, 
int pmode)
 {
-    dsn_handle_t fh = (dsn_handle_t)(uintptr_t)::open(file_name, flag, pmode);
-    if (fh == DSN_INVALID_FILE_HANDLE) {
-        LOG_ERROR("create file failed, err = %s", strerror(errno));
+    auto fd = ::open(file_name, flag, pmode);
+    if (fd == DSN_INVALID_FILE_HANDLE) {
+        LOG_ERROR_F("create file failed, err = {}", 
utils::safe_strerror(errno));
     }
-    return fh;
+    return linux_fd_t(fd);
 }
 
-error_code native_linux_aio_provider::close(dsn_handle_t fh)
+error_code native_linux_aio_provider::close(linux_fd_t fd)
 {
-    if (fh == DSN_INVALID_FILE_HANDLE || ::close((int)(uintptr_t)(fh)) == 0) {
+    if (fd.is_invalid() || ::close(fd.fd) == 0) {
         return ERR_OK;
-    } else {
-        LOG_ERROR("close file failed, err = %s", strerror(errno));
-        return ERR_FILE_OPERATION_FAILED;
     }
+
+    LOG_ERROR_F("close file failed, err = {}", utils::safe_strerror(errno));
+    return ERR_FILE_OPERATION_FAILED;
 }
 
-error_code native_linux_aio_provider::flush(dsn_handle_t fh)
+error_code native_linux_aio_provider::flush(linux_fd_t fd)
 {
-    if (fh == DSN_INVALID_FILE_HANDLE || ::fsync((int)(uintptr_t)(fh)) == 0) {
+    if (fd.is_invalid() || ::fsync(fd.fd) == 0) {
         return ERR_OK;
-    } else {
-        LOG_ERROR("flush file failed, err = %s", strerror(errno));
-        return ERR_FILE_OPERATION_FAILED;
     }
+
+    LOG_ERROR_F("flush file failed, err = {}", utils::safe_strerror(errno));
+    return ERR_FILE_OPERATION_FAILED;
 }
 
 error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
@@ -78,17 +79,19 @@ error_code native_linux_aio_provider::write(const 
aio_context &aio_ctx,
     uint64_t buffer_offset = 0;
     do {
         // ret is the written data size
-        auto ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
-                          (char *)aio_ctx.buffer + buffer_offset,
-                          aio_ctx.buffer_size - buffer_offset,
-                          aio_ctx.file_offset + buffer_offset);
+        auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd,
+                            (char *)aio_ctx.buffer + buffer_offset,
+                            aio_ctx.buffer_size - buffer_offset,
+                            aio_ctx.file_offset + buffer_offset);
         if (dsn_unlikely(ret < 0)) {
             if (errno == EINTR) {
-                LOG_WARNING_F("write failed with errno={} and will retry it.", 
strerror(errno));
+                LOG_WARNING_F("write failed with errno={} and will retry it.",
+                              utils::safe_strerror(errno));
                 continue;
             }
             resp = ERR_FILE_OPERATION_FAILED;
-            LOG_ERROR_F("write failed with errno={}, return {}.", 
strerror(errno), resp);
+            LOG_ERROR_F(
+                "write failed with errno={}, return {}.", 
utils::safe_strerror(errno), resp);
             return resp;
         }
 
@@ -117,11 +120,12 @@ error_code native_linux_aio_provider::write(const 
aio_context &aio_ctx,
 error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
                                            /*out*/ uint64_t *processed_bytes)
 {
-    ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx.file),
-                        aio_ctx.buffer,
-                        aio_ctx.buffer_size,
-                        aio_ctx.file_offset);
-    if (ret < 0) {
+    auto ret = ::pread(aio_ctx.dfile->native_handle().fd,
+                       aio_ctx.buffer,
+                       aio_ctx.buffer_size,
+                       aio_ctx.file_offset);
+    if (dsn_unlikely(ret < 0)) {
+        LOG_WARNING_F("write failed with errno={} and will retry it.", 
utils::safe_strerror(errno));
         return ERR_FILE_OPERATION_FAILED;
     }
     if (ret == 0) {
diff --git a/src/aio/native_linux_aio_provider.h 
b/src/aio/native_linux_aio_provider.h
index 48c02f26c..42f458cdb 100644
--- a/src/aio/native_linux_aio_provider.h
+++ b/src/aio/native_linux_aio_provider.h
@@ -36,9 +36,9 @@ public:
     explicit native_linux_aio_provider(disk_engine *disk);
     ~native_linux_aio_provider() override;
 
-    dsn_handle_t open(const char *file_name, int flag, int pmode) override;
-    error_code close(dsn_handle_t fh) override;
-    error_code flush(dsn_handle_t fh) override;
+    linux_fd_t open(const char *file_name, int flag, int pmode) override;
+    error_code close(linux_fd_t fd) override;
+    error_code flush(linux_fd_t fd) override;
     error_code write(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) override;
     error_code read(const aio_context &aio_ctx, /*out*/ uint64_t 
*processed_bytes) override;
 
diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp
index 3331b77be..a93d77a9f 100644
--- a/src/nfs/nfs_server_impl.cpp
+++ b/src/nfs/nfs_server_impl.cpp
@@ -26,11 +26,12 @@
 
 #include "nfs_server_impl.h"
 
-#include <sys/stat.h>
 #include <fcntl.h>
+#include <sys/stat.h>
 
 #include <cstdlib>
 
+#include "aio/disk_engine.h"
 #include "utils/filesystem.h"
 #include "runtime/task/async_calls.h"
 
@@ -76,26 +77,23 @@ void nfs_service_impl::on_copy(const 
::dsn::service::copy_request &request,
 
     std::string file_path =
         dsn::utils::filesystem::path_combine(request.source_dir, 
request.file_name);
-    disk_file *hfile;
+    disk_file *dfile = nullptr;
 
     {
         zauto_lock l(_handles_map_lock);
         auto it = _handles_map.find(file_path); // find file handle cache first
 
-        if (it == _handles_map.end()) // not found
-        {
-            hfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
-            if (hfile) {
-
+        if (it == _handles_map.end()) {
+            dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
+            if (dfile != nullptr) {
                 auto fh = std::make_shared<file_handle_info_on_server>();
-                fh->file_handle = hfile;
+                fh->file_handle = dfile;
                 fh->file_access_count = 1;
                 fh->last_access_time = dsn_now_ms();
                 _handles_map.insert(std::make_pair(file_path, std::move(fh)));
             }
-        } else // found
-        {
-            hfile = it->second->file_handle;
+        } else {
+            dfile = it->second->file_handle;
             it->second->file_access_count++;
             it->second->last_access_time = dsn_now_ms();
         }
@@ -106,7 +104,7 @@ void nfs_service_impl::on_copy(const 
::dsn::service::copy_request &request,
               request.offset,
               request.offset + request.size);
 
-    if (hfile == 0) {
+    if (dfile == nullptr) {
         LOG_ERROR("{nfs_service} open file %s failed", file_path.c_str());
         ::dsn::service::copy_response resp;
         resp.error = ERR_OBJECT_NOT_FOUND;
@@ -119,14 +117,13 @@ void nfs_service_impl::on_copy(const 
::dsn::service::copy_request &request,
     cp->dst_dir = request.dst_dir;
     cp->source_disk_tag = request.source_disk_tag;
     cp->file_path = std::move(file_path);
-    cp->hfile = hfile;
     cp->offset = request.offset;
     cp->size = request.size;
 
     auto buffer_save = cp->bb.buffer().get();
 
     file::read(
-        hfile,
+        dfile,
         buffer_save,
         request.size,
         request.offset,
diff --git a/src/nfs/nfs_server_impl.h b/src/nfs/nfs_server_impl.h
index 987104c15..ae9057596 100644
--- a/src/nfs/nfs_server_impl.h
+++ b/src/nfs/nfs_server_impl.h
@@ -71,7 +71,6 @@ protected:
 private:
     struct callback_para
     {
-        dsn_handle_t hfile;
         std::string source_disk_tag;
         std::string file_path;
         std::string dst_dir;
@@ -80,20 +79,15 @@ private:
         uint32_t size;
         rpc_replier<copy_response> replier;
 
-        callback_para(rpc_replier<copy_response> &&r)
-            : hfile(nullptr), offset(0), size(0), replier(std::move(r))
-        {
-        }
+        callback_para(rpc_replier<copy_response> &&r) : offset(0), size(0), 
replier(std::move(r)) {}
         callback_para(callback_para &&r)
-            : hfile(r.hfile),
-              file_path(std::move(r.file_path)),
+            : file_path(std::move(r.file_path)),
               dst_dir(std::move(r.dst_dir)),
               bb(std::move(r.bb)),
               offset(r.offset),
               size(r.size),
               replier(std::move(r.replier))
         {
-            r.hfile = nullptr;
             r.offset = 0;
             r.size = 0;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to