This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new e7c0589c4 refactor: use linux_fd_t instead of dsn_handle_t for file
descriptor (#1238)
e7c0589c4 is described below
commit e7c0589c48b47a34fbf124f5c5dc7148bad0618e
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Nov 15 11:26:14 2022 +0800
refactor: use linux_fd_t instead of dsn_handle_t for file descriptor (#1238)
---
src/aio/aio_provider.h | 17 ++++++----
src/aio/aio_task.h | 13 ++++----
src/aio/disk_engine.cpp | 22 +++++--------
src/aio/disk_engine.h | 7 ++--
src/aio/file_io.cpp | 15 ++++-----
src/aio/native_linux_aio_provider.cpp | 60 +++++++++++++++++++----------------
src/aio/native_linux_aio_provider.h | 6 ++--
src/nfs/nfs_server_impl.cpp | 25 +++++++--------
src/nfs/nfs_server_impl.h | 10 ++----
9 files changed, 83 insertions(+), 92 deletions(-)
diff --git a/src/aio/aio_provider.h b/src/aio/aio_provider.h
index 12fd56656..261560d3f 100644
--- a/src/aio/aio_provider.h
+++ b/src/aio/aio_provider.h
@@ -37,7 +37,14 @@ class service_node;
class task_worker_pool;
class task_queue;
-#define DSN_INVALID_FILE_HANDLE ((dsn_handle_t)(uintptr_t)-1)
+#define DSN_INVALID_FILE_HANDLE -1
+struct linux_fd_t
+{
+ int fd;
+
+ explicit linux_fd_t(int f) : fd(f) {}
+ inline bool is_invalid() const { return fd == DSN_INVALID_FILE_HANDLE; }
+};
class aio_provider
{
@@ -53,12 +60,10 @@ public:
explicit aio_provider(disk_engine *disk);
virtual ~aio_provider() = default;
- // return DSN_INVALID_FILE_HANDLE if failed
- // TODO(wutao1): return uint64_t instead (because we only support linux
now)
- virtual dsn_handle_t open(const char *file_name, int flag, int pmode) = 0;
+ virtual linux_fd_t open(const char *file_name, int flag, int pmode) = 0;
- virtual error_code close(dsn_handle_t fh) = 0;
- virtual error_code flush(dsn_handle_t fh) = 0;
+ virtual error_code close(linux_fd_t fd) = 0;
+ virtual error_code flush(linux_fd_t fd) = 0;
virtual error_code write(const aio_context &aio_ctx, /*out*/ uint64_t
*processed_bytes) = 0;
virtual error_code read(const aio_context &aio_ctx, /*out*/ uint64_t
*processed_bytes) = 0;
diff --git a/src/aio/aio_task.h b/src/aio/aio_task.h
index 1d278fd61..ea3b1ce0d 100644
--- a/src/aio/aio_task.h
+++ b/src/aio/aio_task.h
@@ -26,10 +26,10 @@
#pragma once
-#include "runtime/task/task.h"
-
#include <vector>
+#include "runtime/task/task.h"
+
namespace dsn {
namespace utils {
@@ -50,11 +50,11 @@ typedef struct
} dsn_file_buffer_t;
class disk_engine;
+class disk_file;
class aio_context : public ref_counter
{
public:
// filled by apps
- dsn_handle_t file;
void *buffer;
uint64_t buffer_size;
uint64_t file_offset;
@@ -62,16 +62,15 @@ public:
// filled by frameworks
aio_type type;
disk_engine *engine;
- void *file_object; // TODO(wutao1): make it disk_file*, and distinguish it
from `file`
+ disk_file *dfile;
aio_context()
- : file(nullptr),
- buffer(nullptr),
+ : buffer(nullptr),
buffer_size(0),
file_offset(0),
type(AIO_Invalid),
engine(nullptr),
- file_object(nullptr)
+ dfile(nullptr)
{
}
};
diff --git a/src/aio/disk_engine.cpp b/src/aio/disk_engine.cpp
index 9fef5d58d..e69f5c982 100644
--- a/src/aio/disk_engine.cpp
+++ b/src/aio/disk_engine.cpp
@@ -91,7 +91,7 @@ aio_task *disk_write_queue::unlink_next_workload(void
*plength)
return first;
}
-disk_file::disk_file(dsn_handle_t handle) : _handle(handle) {}
+disk_file::disk_file(linux_fd_t fd) : _fd(fd) {}
aio_task *disk_file::read(aio_task *tsk)
{
@@ -162,10 +162,9 @@ public:
virtual void exec() override
{
- auto df = (disk_file *)_tasks->get_aio_context()->file_object;
+ auto dfile = _tasks->get_aio_context()->dfile;
uint64_t sz;
-
- auto wk = df->on_write_completed(_tasks, (void *)&sz, error(),
get_transferred_size());
+ auto wk = dfile->on_write_completed(_tasks, (void *)&sz, error(),
get_transferred_size());
if (wk) {
wk->get_aio_context()->engine->process_write(wk, sz);
}
@@ -183,14 +182,10 @@ void disk_engine::write(aio_task *aio)
}
auto dio = aio->get_aio_context();
- auto df = (disk_file *)dio->file;
- dio->file = df->native_handle();
- dio->file_object = df;
dio->engine = this;
- dio->type = AIO_Write;
uint64_t sz;
- auto wk = df->write(aio, &sz);
+ auto wk = dio->dfile->write(aio, &sz);
if (wk) {
process_write(wk, sz);
}
@@ -213,8 +208,7 @@ void disk_engine::process_write(aio_task *aio, uint64_t sz)
auto new_dio = new_task->get_aio_context();
new_dio->buffer_size = sz;
new_dio->file_offset = dio->file_offset;
- new_dio->file = dio->file;
- new_dio->file_object = dio->file_object;
+ new_dio->dfile = dio->dfile;
new_dio->engine = dio->engine;
new_dio->type = AIO_Write;
@@ -256,9 +250,9 @@ void disk_engine::complete_io(aio_task *aio, error_code
err, uint64_t bytes)
// no batching
else {
- auto df = (disk_file *)(aio->get_aio_context()->file_object);
+ auto dfile = aio->get_aio_context()->dfile;
if (aio->get_aio_context()->type == AIO_Read) {
- auto wk = df->on_read_completed(aio, err, (size_t)bytes);
+ auto wk = dfile->on_read_completed(aio, err, (size_t)bytes);
if (wk) {
_provider->submit_aio_task(wk);
}
@@ -267,7 +261,7 @@ void disk_engine::complete_io(aio_task *aio, error_code
err, uint64_t bytes)
// write
else {
uint64_t sz;
- auto wk = df->on_write_completed(aio, (void *)&sz, err,
(size_t)bytes);
+ auto wk = dfile->on_write_completed(aio, (void *)&sz, err,
(size_t)bytes);
if (wk) {
process_write(wk, sz);
}
diff --git a/src/aio/disk_engine.h b/src/aio/disk_engine.h
index 4d821b6ea..8bf3e68ab 100644
--- a/src/aio/disk_engine.h
+++ b/src/aio/disk_engine.h
@@ -52,18 +52,17 @@ private:
class disk_file
{
public:
- disk_file(dsn_handle_t handle);
+ explicit disk_file(linux_fd_t fd);
aio_task *read(aio_task *tsk);
aio_task *write(aio_task *tsk, void *ctx);
aio_task *on_read_completed(aio_task *wk, error_code err, size_t size);
aio_task *on_write_completed(aio_task *wk, void *ctx, error_code err,
size_t size);
- // TODO(wutao1): make it uint64_t
- dsn_handle_t native_handle() const { return _handle; }
+ linux_fd_t native_handle() const { return _fd; }
private:
- dsn_handle_t _handle;
+ linux_fd_t _fd;
disk_write_queue _write_queue;
work_queue<aio_task> _read_queue;
};
diff --git a/src/aio/file_io.cpp b/src/aio/file_io.cpp
index e6e7c763f..39429ea7d 100644
--- a/src/aio/file_io.cpp
+++ b/src/aio/file_io.cpp
@@ -32,12 +32,12 @@ namespace file {
/*extern*/ disk_file *open(const char *file_name, int flag, int pmode)
{
- dsn_handle_t nh = disk_engine::provider().open(file_name, flag, pmode);
- if (nh != DSN_INVALID_FILE_HANDLE) {
- return new disk_file(nh);
- } else {
+ auto fd = disk_engine::provider().open(file_name, flag, pmode);
+ if (fd.is_invalid()) {
return nullptr;
}
+
+ return new disk_file(fd);
}
/*extern*/ error_code close(disk_file *file)
@@ -72,11 +72,10 @@ namespace file {
auto cb = create_aio_task(callback_code, tracker, std::move(callback),
hash);
cb->get_aio_context()->buffer = buffer;
cb->get_aio_context()->buffer_size = count;
- cb->get_aio_context()->file_object = file;
- cb->get_aio_context()->file = file->native_handle();
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Read;
cb->get_aio_context()->engine = &disk_engine::instance();
+ cb->get_aio_context()->dfile = file;
if (!cb->spec().on_aio_call.execute(task::get_current_task(), cb, true)) {
cb->enqueue(ERR_FILE_OPERATION_FAILED, 0);
@@ -101,9 +100,9 @@ namespace file {
auto cb = create_aio_task(callback_code, tracker, std::move(callback),
hash);
cb->get_aio_context()->buffer = (char *)buffer;
cb->get_aio_context()->buffer_size = count;
- cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
+ cb->get_aio_context()->dfile = file;
disk_engine::instance().write(cb);
return cb;
@@ -119,9 +118,9 @@ namespace file {
int hash /*= 0*/)
{
auto cb = create_aio_task(callback_code, tracker, std::move(callback),
hash);
- cb->get_aio_context()->file = file;
cb->get_aio_context()->file_offset = offset;
cb->get_aio_context()->type = AIO_Write;
+ cb->get_aio_context()->dfile = file;
for (int i = 0; i < buffer_count; i++) {
if (buffers[i].size > 0) {
cb->_unmerged_write_buffers.push_back(buffers[i]);
diff --git a/src/aio/native_linux_aio_provider.cpp
b/src/aio/native_linux_aio_provider.cpp
index 3ff8dda22..3ca601c76 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -28,13 +28,14 @@
#include <fcntl.h>
+#include "aio/disk_engine.h"
#include "runtime/service_engine.h"
-
#include "runtime/task/async_calls.h"
#include "utils/api_utilities.h"
-#include "utils/fmt_logging.h"
#include "utils/fail_point.h"
+#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
+#include "utils/safe_strerror_posix.h"
namespace dsn {
@@ -42,33 +43,33 @@
native_linux_aio_provider::native_linux_aio_provider(disk_engine *disk) : aio_pr
native_linux_aio_provider::~native_linux_aio_provider() {}
-dsn_handle_t native_linux_aio_provider::open(const char *file_name, int flag,
int pmode)
+linux_fd_t native_linux_aio_provider::open(const char *file_name, int flag,
int pmode)
{
- dsn_handle_t fh = (dsn_handle_t)(uintptr_t)::open(file_name, flag, pmode);
- if (fh == DSN_INVALID_FILE_HANDLE) {
- LOG_ERROR("create file failed, err = %s", strerror(errno));
+ auto fd = ::open(file_name, flag, pmode);
+ if (fd == DSN_INVALID_FILE_HANDLE) {
+ LOG_ERROR_F("create file failed, err = {}",
utils::safe_strerror(errno));
}
- return fh;
+ return linux_fd_t(fd);
}
-error_code native_linux_aio_provider::close(dsn_handle_t fh)
+error_code native_linux_aio_provider::close(linux_fd_t fd)
{
- if (fh == DSN_INVALID_FILE_HANDLE || ::close((int)(uintptr_t)(fh)) == 0) {
+ if (fd.is_invalid() || ::close(fd.fd) == 0) {
return ERR_OK;
- } else {
- LOG_ERROR("close file failed, err = %s", strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
}
+
+ LOG_ERROR_F("close file failed, err = {}", utils::safe_strerror(errno));
+ return ERR_FILE_OPERATION_FAILED;
}
-error_code native_linux_aio_provider::flush(dsn_handle_t fh)
+error_code native_linux_aio_provider::flush(linux_fd_t fd)
{
- if (fh == DSN_INVALID_FILE_HANDLE || ::fsync((int)(uintptr_t)(fh)) == 0) {
+ if (fd.is_invalid() || ::fsync(fd.fd) == 0) {
return ERR_OK;
- } else {
- LOG_ERROR("flush file failed, err = %s", strerror(errno));
- return ERR_FILE_OPERATION_FAILED;
}
+
+ LOG_ERROR_F("flush file failed, err = {}", utils::safe_strerror(errno));
+ return ERR_FILE_OPERATION_FAILED;
}
error_code native_linux_aio_provider::write(const aio_context &aio_ctx,
@@ -78,17 +79,19 @@ error_code native_linux_aio_provider::write(const
aio_context &aio_ctx,
uint64_t buffer_offset = 0;
do {
// ret is the written data size
- auto ret = pwrite(static_cast<int>((ssize_t)aio_ctx.file),
- (char *)aio_ctx.buffer + buffer_offset,
- aio_ctx.buffer_size - buffer_offset,
- aio_ctx.file_offset + buffer_offset);
+ auto ret = ::pwrite(aio_ctx.dfile->native_handle().fd,
+ (char *)aio_ctx.buffer + buffer_offset,
+ aio_ctx.buffer_size - buffer_offset,
+ aio_ctx.file_offset + buffer_offset);
if (dsn_unlikely(ret < 0)) {
if (errno == EINTR) {
- LOG_WARNING_F("write failed with errno={} and will retry it.",
strerror(errno));
+ LOG_WARNING_F("write failed with errno={} and will retry it.",
+ utils::safe_strerror(errno));
continue;
}
resp = ERR_FILE_OPERATION_FAILED;
- LOG_ERROR_F("write failed with errno={}, return {}.",
strerror(errno), resp);
+ LOG_ERROR_F(
+ "write failed with errno={}, return {}.",
utils::safe_strerror(errno), resp);
return resp;
}
@@ -117,11 +120,12 @@ error_code native_linux_aio_provider::write(const
aio_context &aio_ctx,
error_code native_linux_aio_provider::read(const aio_context &aio_ctx,
/*out*/ uint64_t *processed_bytes)
{
- ssize_t ret = pread(static_cast<int>((ssize_t)aio_ctx.file),
- aio_ctx.buffer,
- aio_ctx.buffer_size,
- aio_ctx.file_offset);
- if (ret < 0) {
+ auto ret = ::pread(aio_ctx.dfile->native_handle().fd,
+ aio_ctx.buffer,
+ aio_ctx.buffer_size,
+ aio_ctx.file_offset);
+ if (dsn_unlikely(ret < 0)) {
+ LOG_WARNING_F("write failed with errno={} and will retry it.",
utils::safe_strerror(errno));
return ERR_FILE_OPERATION_FAILED;
}
if (ret == 0) {
diff --git a/src/aio/native_linux_aio_provider.h
b/src/aio/native_linux_aio_provider.h
index 48c02f26c..42f458cdb 100644
--- a/src/aio/native_linux_aio_provider.h
+++ b/src/aio/native_linux_aio_provider.h
@@ -36,9 +36,9 @@ public:
explicit native_linux_aio_provider(disk_engine *disk);
~native_linux_aio_provider() override;
- dsn_handle_t open(const char *file_name, int flag, int pmode) override;
- error_code close(dsn_handle_t fh) override;
- error_code flush(dsn_handle_t fh) override;
+ linux_fd_t open(const char *file_name, int flag, int pmode) override;
+ error_code close(linux_fd_t fd) override;
+ error_code flush(linux_fd_t fd) override;
error_code write(const aio_context &aio_ctx, /*out*/ uint64_t
*processed_bytes) override;
error_code read(const aio_context &aio_ctx, /*out*/ uint64_t
*processed_bytes) override;
diff --git a/src/nfs/nfs_server_impl.cpp b/src/nfs/nfs_server_impl.cpp
index 3331b77be..a93d77a9f 100644
--- a/src/nfs/nfs_server_impl.cpp
+++ b/src/nfs/nfs_server_impl.cpp
@@ -26,11 +26,12 @@
#include "nfs_server_impl.h"
-#include <sys/stat.h>
#include <fcntl.h>
+#include <sys/stat.h>
#include <cstdlib>
+#include "aio/disk_engine.h"
#include "utils/filesystem.h"
#include "runtime/task/async_calls.h"
@@ -76,26 +77,23 @@ void nfs_service_impl::on_copy(const
::dsn::service::copy_request &request,
std::string file_path =
dsn::utils::filesystem::path_combine(request.source_dir,
request.file_name);
- disk_file *hfile;
+ disk_file *dfile = nullptr;
{
zauto_lock l(_handles_map_lock);
auto it = _handles_map.find(file_path); // find file handle cache first
- if (it == _handles_map.end()) // not found
- {
- hfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
- if (hfile) {
-
+ if (it == _handles_map.end()) {
+ dfile = file::open(file_path.c_str(), O_RDONLY | O_BINARY, 0);
+ if (dfile != nullptr) {
auto fh = std::make_shared<file_handle_info_on_server>();
- fh->file_handle = hfile;
+ fh->file_handle = dfile;
fh->file_access_count = 1;
fh->last_access_time = dsn_now_ms();
_handles_map.insert(std::make_pair(file_path, std::move(fh)));
}
- } else // found
- {
- hfile = it->second->file_handle;
+ } else {
+ dfile = it->second->file_handle;
it->second->file_access_count++;
it->second->last_access_time = dsn_now_ms();
}
@@ -106,7 +104,7 @@ void nfs_service_impl::on_copy(const
::dsn::service::copy_request &request,
request.offset,
request.offset + request.size);
- if (hfile == 0) {
+ if (dfile == nullptr) {
LOG_ERROR("{nfs_service} open file %s failed", file_path.c_str());
::dsn::service::copy_response resp;
resp.error = ERR_OBJECT_NOT_FOUND;
@@ -119,14 +117,13 @@ void nfs_service_impl::on_copy(const
::dsn::service::copy_request &request,
cp->dst_dir = request.dst_dir;
cp->source_disk_tag = request.source_disk_tag;
cp->file_path = std::move(file_path);
- cp->hfile = hfile;
cp->offset = request.offset;
cp->size = request.size;
auto buffer_save = cp->bb.buffer().get();
file::read(
- hfile,
+ dfile,
buffer_save,
request.size,
request.offset,
diff --git a/src/nfs/nfs_server_impl.h b/src/nfs/nfs_server_impl.h
index 987104c15..ae9057596 100644
--- a/src/nfs/nfs_server_impl.h
+++ b/src/nfs/nfs_server_impl.h
@@ -71,7 +71,6 @@ protected:
private:
struct callback_para
{
- dsn_handle_t hfile;
std::string source_disk_tag;
std::string file_path;
std::string dst_dir;
@@ -80,20 +79,15 @@ private:
uint32_t size;
rpc_replier<copy_response> replier;
- callback_para(rpc_replier<copy_response> &&r)
- : hfile(nullptr), offset(0), size(0), replier(std::move(r))
- {
- }
+ callback_para(rpc_replier<copy_response> &&r) : offset(0), size(0),
replier(std::move(r)) {}
callback_para(callback_para &&r)
- : hfile(r.hfile),
- file_path(std::move(r.file_path)),
+ : file_path(std::move(r.file_path)),
dst_dir(std::move(r.dst_dir)),
bb(std::move(r.bb)),
offset(r.offset),
size(r.size),
replier(std::move(r.replier))
{
- r.hfile = nullptr;
r.offset = 0;
r.size = 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]