This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 73d53dbbb refactor(hdfs): use rocksdb API to read/write file (#1618)
73d53dbbb is described below
commit 73d53dbbb923fc15a7b3b5f4def8186c2fc97c73
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Sep 25 16:45:15 2023 +0800
refactor(hdfs): use rocksdb API to read/write file (#1618)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the HDFS service and
related unit test.
The directio_writable_file.* files have been removed because it has been
implemented by the rocksdb API options.
---
src/block_service/directio_writable_file.cpp | 169 -------------
src/block_service/directio_writable_file.h | 57 -----
src/block_service/hdfs/CMakeLists.txt | 8 +-
src/block_service/hdfs/hdfs_service.cpp | 181 ++++++++------
src/block_service/test/config-test.ini | 4 +-
src/block_service/test/hdfs_service_test.cpp | 358 ++++++++++++++++-----------
src/replica/CMakeLists.txt | 2 +-
src/utils/env.cpp | 8 +
src/utils/env.h | 4 +-
9 files changed, 333 insertions(+), 458 deletions(-)
diff --git a/src/block_service/directio_writable_file.cpp
b/src/block_service/directio_writable_file.cpp
deleted file mode 100644
index 2c74ae05d..000000000
--- a/src/block_service/directio_writable_file.cpp
+++ /dev/null
@@ -1,169 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "block_service/directio_writable_file.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <stdlib.h> // posix_memalign
-#include <sys/stat.h>
-#include <unistd.h> // getpagesize
-#include <algorithm>
-#include <cstring>
-#include <string>
-
-#include "utils/flags.h"
-#include "utils/fmt_logging.h"
-#include "utils/safe_strerror_posix.h"
-
-namespace dsn {
-namespace dist {
-namespace block_service {
-
-DSN_DEFINE_uint32(replication,
- direct_io_buffer_pages,
- 64,
- "Number of pages we need to set to direct io buffer");
-DSN_TAG_VARIABLE(direct_io_buffer_pages, FT_MUTABLE);
-
-DSN_DEFINE_bool(replication,
- enable_direct_io,
- false,
- "Whether to enable direct I/O when download files");
-DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE);
-
-const uint32_t g_page_size = getpagesize();
-
-direct_io_writable_file::direct_io_writable_file(const std::string &file_path)
- : _file_path(file_path),
- _fd(-1),
- _file_size(0),
- _buffer(nullptr),
- _buffer_size(FLAGS_direct_io_buffer_pages * g_page_size),
- _offset(0)
-{
-}
-
-direct_io_writable_file::~direct_io_writable_file()
-{
- if (!_buffer || _fd < 0) {
- return;
- }
- // Here is an ensurance, users shuold call finalize manually
- CHECK_EQ_MSG(_offset, 0, "finalize() should be called before destructor");
-
- ::free(_buffer);
- CHECK_EQ_MSG(
- 0, ::close(_fd), "Failed to close {}, err = {}", _file_path,
utils::safe_strerror(errno));
-}
-
-bool direct_io_writable_file::initialize()
-{
- if (posix_memalign(&_buffer, g_page_size, _buffer_size) != 0) {
- LOG_ERROR("Allocate memaligned buffer failed, err = {}",
utils::safe_strerror(errno));
- return false;
- }
-
- int flag = O_WRONLY | O_TRUNC | O_CREAT;
-#if !defined(__APPLE__)
- flag |= O_DIRECT;
-#endif
- // TODO(yingchun): there maybe serious error of the disk driver when these
system call failed,
- // maybe just terminate the process or mark the disk as failed would be
better
- _fd = ::open(_file_path.c_str(), flag, S_IRUSR | S_IWUSR | S_IRGRP);
- if (_fd < 0) {
- LOG_ERROR("Failed to open {} with flag {}, err = {}",
- _file_path,
- flag,
- utils::safe_strerror(errno));
- ::free(_buffer);
- _buffer = nullptr;
- return false;
- }
- return true;
-}
-
-bool direct_io_writable_file::finalize()
-{
- CHECK(_buffer && _fd >= 0, "Initialize the instance first");
-
- if (_offset > 0) {
- ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size);
- if (dsn_unlikely(written_bytes < 0)) {
- LOG_ERROR("Failed to write the last chunk, file_path = {}, err =
{}",
- _file_path,
- utils::safe_strerror(errno));
- return false;
- }
- // TODO(yingchun): would better to retry
- if (dsn_unlikely(written_bytes != _buffer_size)) {
- LOG_ERROR("Failed to write the last chunk, file_path = {}, data
bytes = {}, written "
- "bytes = {}",
- _file_path,
- _buffer_size,
- written_bytes);
- return false;
- }
- _offset = 0;
- if (::ftruncate(_fd, _file_size) < 0) {
- LOG_ERROR("Failed to truncate {}, err = {}", _file_path,
utils::safe_strerror(errno));
- return false;
- }
- }
- return true;
-}
-
-bool direct_io_writable_file::write(const char *s, size_t n)
-{
- CHECK(_buffer && _fd >= 0, "Initialize the instance first");
-
- uint32_t remaining = n;
- while (remaining > 0) {
- uint32_t bytes = std::min((_buffer_size - _offset), remaining);
- memcpy((char *)_buffer + _offset, s, bytes);
- _offset += bytes;
- remaining -= bytes;
- s += bytes;
- // buffer is full, flush to file
- if (_offset == _buffer_size) {
- ssize_t written_bytes = ::write(_fd, _buffer, _buffer_size);
- if (dsn_unlikely(written_bytes < 0)) {
- LOG_ERROR("Failed to write chunk, file_path = {}, err = {}",
- _file_path,
- utils::safe_strerror(errno));
- return false;
- }
- // TODO(yingchun): would better to retry
- if (dsn_unlikely(written_bytes != _buffer_size)) {
- LOG_ERROR(
- "Failed to write chunk, file_path = {}, data bytes = {},
written bytes = {}",
- _file_path,
- _buffer_size,
- written_bytes);
- return false;
- }
- // reset offset
- _offset = 0;
- }
- }
- _file_size += n;
- return true;
-}
-
-} // namespace block_service
-} // namespace dist
-} // namespace dsn
diff --git a/src/block_service/directio_writable_file.h
b/src/block_service/directio_writable_file.h
deleted file mode 100644
index d4f99949f..000000000
--- a/src/block_service/directio_writable_file.h
+++ /dev/null
@@ -1,57 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-#include <string>
-
-#include "utils/ports.h"
-
-namespace dsn {
-namespace dist {
-namespace block_service {
-
-class direct_io_writable_file
-{
-public:
- explicit direct_io_writable_file(const std::string &file_path);
- ~direct_io_writable_file();
-
- bool initialize();
- bool write(const char *s, size_t n);
- bool finalize();
-
-private:
- DISALLOW_COPY_AND_ASSIGN(direct_io_writable_file);
-
- std::string _file_path;
- int _fd;
- uint32_t _file_size;
-
- // page size aligned buffer
- void *_buffer;
- // buffer size
- uint32_t _buffer_size;
- // buffer offset
- uint32_t _offset;
-};
-
-} // namespace block_service
-} // namespace dist
-} // namespace dsn
diff --git a/src/block_service/hdfs/CMakeLists.txt
b/src/block_service/hdfs/CMakeLists.txt
index 803e85bec..2bba8b96b 100644
--- a/src/block_service/hdfs/CMakeLists.txt
+++ b/src/block_service/hdfs/CMakeLists.txt
@@ -17,20 +17,16 @@
set(MY_PROJ_NAME dsn.block_service.hdfs)
-set(DIRECTIO_SRC
- ../directio_writable_file.cpp
- )
-
#Source files under CURRENT project directory will be automatically included.
#You can manually set MY_PROJ_SRC to include source files under other
directories.
-set(MY_PROJ_SRC "${DIRECTIO_SRC}")
+set(MY_PROJ_SRC "")
#Search mode for source files under CURRENT project directory ?
#"GLOB_RECURSE" for recursive search
#"GLOB" for non - recursive search
set(MY_SRC_SEARCH_MODE "GLOB")
-set(MY_PROJ_LIBS hdfs)
+set(MY_PROJ_LIBS hdfs rocksdb)
#Extra files that will be installed
set(MY_BINPLACES "")
diff --git a/src/block_service/hdfs/hdfs_service.cpp
b/src/block_service/hdfs/hdfs_service.cpp
index 459108e11..5909f40de 100644
--- a/src/block_service/hdfs/hdfs_service.cpp
+++ b/src/block_service/hdfs/hdfs_service.cpp
@@ -17,19 +17,21 @@
#include <errno.h>
#include <fcntl.h>
+#include <rocksdb/env.h>
#include <algorithm>
-#include <fstream>
#include <type_traits>
#include <utility>
-#include "block_service/directio_writable_file.h"
#include "hdfs/hdfs.h"
#include "hdfs_service.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "utils/TokenBucket.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
+#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
@@ -37,6 +39,8 @@
#include "utils/safe_strerror_posix.h"
#include "utils/strings.h"
+DSN_DECLARE_bool(enable_direct_io);
+
struct hdfsBuilder;
namespace dsn {
@@ -65,8 +69,6 @@ DSN_DEFINE_uint64(replication,
"hdfs write batch size, the default value is 64MB");
DSN_TAG_VARIABLE(hdfs_write_batch_size_bytes, FT_MUTABLE);
-DSN_DECLARE_bool(enable_direct_io);
-
hdfs_service::hdfs_service()
{
_read_token_bucket.reset(new folly::DynamicTokenBucket());
@@ -108,12 +110,12 @@ error_code hdfs_service::create_fs()
hdfsBuilderSetNameNode(builder, _hdfs_name_node.c_str());
_fs = hdfsBuilderConnect(builder);
if (!_fs) {
- LOG_ERROR("Fail to connect hdfs name node {}, error: {}.",
+ LOG_ERROR("Fail to connect HDFS name node {}, error: {}.",
_hdfs_name_node,
utils::safe_strerror(errno));
return ERR_FS_INTERNAL;
}
- LOG_INFO("Succeed to connect hdfs name node {}.", _hdfs_name_node);
+ LOG_INFO("Succeed to connect HDFS name node {}.", _hdfs_name_node);
return ERR_OK;
}
@@ -122,10 +124,10 @@ void hdfs_service::close()
// This method should be carefully called.
// Calls to hdfsDisconnect() by individual threads would terminate
// all other connections handed out via hdfsConnect() to the same URI.
- LOG_INFO("Try to disconnect hdfs.");
+ LOG_INFO("Try to disconnect HDFS.");
int result = hdfsDisconnect(_fs);
if (result == -1) {
- LOG_ERROR("Fail to disconnect from the hdfs file system, error: {}.",
+ LOG_ERROR("Fail to disconnect from the HDFS file system, error: {}.",
utils::safe_strerror(errno));
}
// Even if there is an error, the resources associated with the hdfsFS
will be freed.
@@ -134,7 +136,7 @@ void hdfs_service::close()
std::string hdfs_service::get_hdfs_entry_name(const std::string &hdfs_path)
{
- // get exact file name from an hdfs path.
+ // get exact file name from an HDFS path.
int pos = hdfs_path.find_last_of("/");
return hdfs_path.substr(pos + 1);
}
@@ -305,7 +307,7 @@ error_code hdfs_file_object::write_data_in_batches(const
char *data,
hdfsFile write_file =
hdfsOpenFile(_service->get_fs(), file_name().c_str(), O_WRONLY |
O_CREAT, 0, 0, 0);
if (!write_file) {
- LOG_ERROR("Failed to open hdfs file {} for writting, error: {}.",
+ LOG_ERROR("Failed to open HDFS file {} for writting, error: {}.",
file_name(),
utils::safe_strerror(errno));
return ERR_FS_INTERNAL;
@@ -323,7 +325,7 @@ error_code hdfs_file_object::write_data_in_batches(const
char *data,
(void *)(data + cur_pos),
static_cast<tSize>(write_len));
if (num_written_bytes == -1) {
- LOG_ERROR("Failed to write hdfs file {}, error: {}.",
+ LOG_ERROR("Failed to write HDFS file {}, error: {}.",
file_name(),
utils::safe_strerror(errno));
hdfsCloseFile(_service->get_fs(), write_file);
@@ -333,18 +335,18 @@ error_code hdfs_file_object::write_data_in_batches(const
char *data,
}
if (hdfsHFlush(_service->get_fs(), write_file) != 0) {
LOG_ERROR(
- "Failed to flush hdfs file {}, error: {}.", file_name(),
utils::safe_strerror(errno));
+ "Failed to flush HDFS file {}, error: {}.", file_name(),
utils::safe_strerror(errno));
hdfsCloseFile(_service->get_fs(), write_file);
return ERR_FS_INTERNAL;
}
written_size = cur_pos;
if (hdfsCloseFile(_service->get_fs(), write_file) != 0) {
LOG_ERROR(
- "Failed to close hdfs file {}, error: {}", file_name(),
utils::safe_strerror(errno));
+ "Failed to close HDFS file {}, error: {}", file_name(),
utils::safe_strerror(errno));
return ERR_FS_INTERNAL;
}
- LOG_INFO("start to synchronize meta data after successfully wrote data to
hdfs");
+ LOG_INFO("start to synchronize meta data after successfully wrote data to
HDFS");
return get_file_meta();
}
@@ -376,23 +378,51 @@ dsn::task_ptr hdfs_file_object::upload(const
upload_request &req,
add_ref();
auto upload_background = [this, req, t]() {
+ LOG_INFO("start to upload from '{}' to '{}'", req.input_local_name,
file_name());
+
upload_response resp;
- resp.uploaded_size = 0;
- std::ifstream is(req.input_local_name, std::ios::binary |
std::ios::in);
- if (is.is_open()) {
- int64_t file_sz = 0;
- dsn::utils::filesystem::file_size(req.input_local_name, file_sz);
- std::unique_ptr<char[]> buffer(new char[file_sz]);
- is.read(buffer.get(), file_sz);
- is.close();
- resp.err = write_data_in_batches(buffer.get(), file_sz,
resp.uploaded_size);
- } else {
- LOG_ERROR("HDFS upload failed: open local file {} failed when
upload to {}, error: {}",
- req.input_local_name,
- file_name(),
- utils::safe_strerror(errno));
- resp.err = dsn::ERR_FILE_OPERATION_FAILED;
- }
+ do {
+ rocksdb::EnvOptions env_options;
+ env_options.use_direct_reads = FLAGS_enable_direct_io;
+ std::unique_ptr<rocksdb::SequentialFile> rfile;
+ auto s = rocksdb::Env::Default()->NewSequentialFile(
+ req.input_local_name, &rfile, env_options);
+ if (!s.ok()) {
+ LOG_ERROR(
+ "open local file '{}' failed, err = {}",
req.input_local_name, s.ToString());
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
+ }
+
+ int64_t file_size;
+ if (!dsn::utils::filesystem::file_size(
+ req.input_local_name,
dsn::utils::FileDataType::kSensitive, file_size)) {
+ LOG_ERROR("get size of local file '{}' failed",
req.input_local_name);
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
+ }
+
+ rocksdb::Slice result;
+ char scratch[file_size];
+ s = rfile->Read(file_size, &result, scratch);
+ if (!s.ok()) {
+ LOG_ERROR(
+ "read local file '{}' failed, err = {}",
req.input_local_name, s.ToString());
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ break;
+ }
+
+ resp.err = write_data_in_batches(result.data(), result.size(),
resp.uploaded_size);
+ if (resp.err != ERR_OK) {
+ LOG_ERROR("write data to remote '{}' failed, err = {}",
file_name(), resp.err);
+ break;
+ }
+
+ LOG_INFO("finish to upload from '{}' to '{}', size = {}",
+ req.input_local_name,
+ file_name(),
+ resp.uploaded_size);
+ } while (false);
t->enqueue_with(resp);
release_ref();
};
@@ -417,7 +447,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t
start_pos,
hdfsFile read_file = hdfsOpenFile(_service->get_fs(), file_name().c_str(),
O_RDONLY, 0, 0, 0);
if (!read_file) {
- LOG_ERROR("Failed to open hdfs file {} for reading, error: {}.",
+ LOG_ERROR("Failed to open HDFS file {} for reading, error: {}.",
file_name(),
utils::safe_strerror(errno));
return ERR_FS_INTERNAL;
@@ -446,7 +476,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t
start_pos,
cur_pos += num_read_bytes;
dst_buf += num_read_bytes;
} else if (num_read_bytes == -1) {
- LOG_ERROR("Failed to read hdfs file {}, error: {}.",
+ LOG_ERROR("Failed to read HDFS file {}, error: {}.",
file_name(),
utils::safe_strerror(errno));
read_success = false;
@@ -455,7 +485,7 @@ error_code hdfs_file_object::read_data_in_batches(uint64_t
start_pos,
}
if (hdfsCloseFile(_service->get_fs(), read_file) != 0) {
LOG_ERROR(
- "Failed to close hdfs file {}, error: {}.", file_name(),
utils::safe_strerror(errno));
+ "Failed to close HDFS file {}, error: {}.", file_name(),
utils::safe_strerror(errno));
return ERR_FS_INTERNAL;
}
if (read_success) {
@@ -504,48 +534,53 @@ dsn::task_ptr hdfs_file_object::download(const
download_request &req,
auto download_background = [this, req, t]() {
download_response resp;
resp.downloaded_size = 0;
- std::string read_buffer;
- size_t read_length = 0;
- resp.err =
- read_data_in_batches(req.remote_pos, req.remote_length,
read_buffer, read_length);
- if (resp.err == ERR_OK) {
- bool write_succ = false;
- if (FLAGS_enable_direct_io) {
- auto dio_file =
std::make_unique<direct_io_writable_file>(req.output_local_name);
- do {
- if (!dio_file->initialize()) {
- break;
- }
- bool wr_ret = dio_file->write(read_buffer.c_str(),
read_length);
- if (!wr_ret) {
- break;
- }
- if (dio_file->finalize()) {
- resp.downloaded_size = read_length;
- resp.file_md5 = utils::string_md5(read_buffer.c_str(),
read_length);
- write_succ = true;
- }
- } while (0);
- } else {
- std::ofstream out(req.output_local_name,
- std::ios::binary | std::ios::out |
std::ios::trunc);
- if (out.is_open()) {
- out.write(read_buffer.c_str(), read_length);
- out.close();
- resp.downloaded_size = read_length;
- resp.file_md5 = utils::string_md5(read_buffer.c_str(),
read_length);
- write_succ = true;
- }
+ resp.err = ERR_OK;
+ bool write_succ = false;
+ std::string target_file = req.output_local_name;
+ do {
+ LOG_INFO("start to download from '{}' to '{}'", file_name(),
target_file);
+
+ std::string read_buffer;
+ size_t read_length = 0;
+ resp.err =
+ read_data_in_batches(req.remote_pos, req.remote_length,
read_buffer, read_length);
+ if (resp.err != ERR_OK) {
+ LOG_ERROR("read data from remote '{}' failed, err = {}",
file_name(), resp.err);
+ break;
}
- if (!write_succ) {
- LOG_ERROR("HDFS download failed: fail to open localfile {}
when download {}, "
- "error: {}",
- req.output_local_name,
- file_name(),
- utils::safe_strerror(errno));
- resp.err = ERR_FILE_OPERATION_FAILED;
- resp.downloaded_size = 0;
+
+ rocksdb::EnvOptions env_options;
+ env_options.use_direct_writes = FLAGS_enable_direct_io;
+ std::unique_ptr<rocksdb::WritableFile> wfile;
+ auto s = rocksdb::Env::Default()->NewWritableFile(target_file,
&wfile, env_options);
+ if (!s.ok()) {
+ LOG_ERROR("create local file '{}' failed, err = {}",
target_file, s.ToString());
+ break;
+ }
+
+ s = wfile->Append(rocksdb::Slice(read_buffer.data(), read_length));
+ if (!s.ok()) {
+ LOG_ERROR("append local file '{}' failed, err = {}",
target_file, s.ToString());
+ break;
+ }
+
+ s = wfile->Fsync();
+ if (!s.ok()) {
+ LOG_ERROR("fsync local file '{}' failed, err = {}",
target_file, s.ToString());
+ break;
}
+
+ resp.downloaded_size = read_length;
+ resp.file_md5 = utils::string_md5(read_buffer.c_str(),
read_length);
+ write_succ = true;
+ } while (false);
+
+ if (!write_succ) {
+ LOG_ERROR("HDFS download failed: fail to write local file {} when
download {}",
+ target_file,
+ file_name());
+ resp.err = ERR_FILE_OPERATION_FAILED;
+ resp.downloaded_size = 0;
}
t->enqueue_with(resp);
release_ref();
diff --git a/src/block_service/test/config-test.ini
b/src/block_service/test/config-test.ini
index c1996d551..2acb86e2b 100644
--- a/src/block_service/test/config-test.ini
+++ b/src/block_service/test/config-test.ini
@@ -53,7 +53,7 @@ max_size = 150
worker_count = 8
[hdfs_test]
-test_name_node = <hdfs_name_none>
-test_backup_path = <hdfs_path>
+test_name_node =
+test_backup_path =
num_test_file_lines = 4096
num_total_files_for_hdfs_concurrent_test = 64
diff --git a/src/block_service/test/hdfs_service_test.cpp
b/src/block_service/test/hdfs_service_test.cpp
index 6cfcfb379..d7b33a8da 100644
--- a/src/block_service/test/hdfs_service_test.cpp
+++ b/src/block_service/test/hdfs_service_test.cpp
@@ -15,15 +15,20 @@
// specific language governing permissions and limitations
// under the License.
+#include <fmt/core.h>
+#include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
+#include <rocksdb/env.h>
+#include <rocksdb/slice.h>
+#include <rocksdb/status.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <algorithm>
#include <cstdint>
-#include <fstream>
+#include <iostream>
#include <memory>
#include <string>
#include <vector>
@@ -35,27 +40,23 @@
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/enum_helper.h"
+#include "utils/env.h"
#include "utils/error_code.h"
#include "utils/filesystem.h"
#include "utils/flags.h"
-#include "utils/strings.h"
+#include "utils/fmt_logging.h"
+#include "utils/test_macros.h"
#include "utils/threadpool_code.h"
using namespace dsn;
using namespace dsn::dist::block_service;
-static std::string example_name_node = "<hdfs_name_none>";
-static std::string example_backup_path = "<hdfs_path>";
-// Please modify following paras in 'config-test.ini' to enable
hdfs_service_test,
-// or hdfs_service_test will be skipped and return true.
-DSN_DEFINE_string(hdfs_test, test_name_node, "<hdfs_name_none>", "hdfs name
node");
-DSN_DEFINE_string(hdfs_test,
- test_backup_path,
- "<hdfs_path>",
- "path for uploading and downloading test files");
+DSN_DEFINE_string(hdfs_test, test_name_node, "", "hdfs name node");
+DSN_DEFINE_string(hdfs_test, test_backup_path, "", "path for uploading and
downloading test files");
DSN_DEFINE_uint32(hdfs_test, num_test_file_lines, 4096, "number of lines in
test file");
DSN_DEFINE_uint32(hdfs_test,
@@ -65,206 +66,257 @@ DSN_DEFINE_uint32(hdfs_test,
DEFINE_TASK_CODE(LPC_TEST_HDFS, TASK_PRIORITY_HIGH, dsn::THREAD_POOL_DEFAULT)
-class HDFSClientTest : public testing::Test
+class HDFSClientTest : public pegasus::encrypt_data_test_base
{
protected:
- virtual void SetUp() override;
- virtual void TearDown() override;
- void generate_test_file(const char *filename);
- void write_test_files_async(task_tracker *tracker);
- std::string name_node;
- std::string backup_path;
- std::string local_test_dir;
- std::string test_data_str;
+ void generate_test_file(const std::string &filename);
+ void write_test_files_async(const std::string &local_test_path,
+ int total_files,
+ int *success_count,
+ task_tracker *tracker);
};
-void HDFSClientTest::SetUp()
+void HDFSClientTest::generate_test_file(const std::string &filename)
{
- name_node = FLAGS_test_name_node;
- backup_path = FLAGS_test_backup_path;
- local_test_dir = "test_dir";
- test_data_str = "";
- for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
- test_data_str += "test";
+ int lines = FLAGS_num_test_file_lines;
+ std::unique_ptr<rocksdb::WritableFile> wfile;
+ auto s = rocksdb::Env::Default()->NewWritableFile(filename, &wfile,
rocksdb::EnvOptions());
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ for (int i = 0; i < lines; ++i) {
+ rocksdb::Slice data(fmt::format("{:04}d_this_is_a_simple_test_file\n",
i));
+ s = wfile->Append(data);
+ ASSERT_TRUE(s.ok()) << s.ToString();
}
+ s = wfile->Fsync();
+ ASSERT_TRUE(s.ok()) << s.ToString();
}
-void HDFSClientTest::TearDown() {}
-
-void HDFSClientTest::generate_test_file(const char *filename)
+void HDFSClientTest::write_test_files_async(const std::string &local_test_path,
+ int total_files,
+ int *success_count,
+ task_tracker *tracker)
{
- // generate a local test file.
- int lines = FLAGS_num_test_file_lines;
- FILE *fp = fopen(filename, "wb");
- for (int i = 0; i < lines; ++i) {
- fprintf(fp, "%04d_this_is_a_simple_test_file\n", i);
+ dsn::utils::filesystem::create_directory(local_test_path);
+ std::string local_test_data;
+ for (int i = 0; i < FLAGS_num_test_file_lines; ++i) {
+ local_test_data += "test";
+ }
+ for (int i = 0; i < total_files; ++i) {
+ tasking::enqueue(
+ LPC_TEST_HDFS, tracker, [&local_test_path, &local_test_data, i,
success_count]() {
+ // mock the writing process in hdfs_file_object::download().
+ std::string test_file_name = local_test_path + "/test_file_" +
std::to_string(i);
+ auto s = rocksdb::WriteStringToFile(rocksdb::Env::Default(),
+
rocksdb::Slice(local_test_data),
+ test_file_name,
+ /* should_sync */ true);
+ if (s.ok()) {
+ ++(*success_count);
+ } else {
+ CHECK(s.IsIOError(), "{}", s.ToString());
+ auto pos1 = s.ToString().find(
+ "IO error: No such file or directory: While open a
file for appending: ");
+ auto pos2 = s.ToString().find("IO error: While appending
to file: ");
+ CHECK(pos1 == 0 || pos2 == 0, "{}", s.ToString());
+ }
+ });
}
- fclose(fp);
}
-void HDFSClientTest::write_test_files_async(task_tracker *tracker)
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, HDFSClientTest, ::testing::Values(false));
+
+TEST_P(HDFSClientTest, test_hdfs_read_write)
{
- dsn::utils::filesystem::create_directory(local_test_dir);
- for (int i = 0; i < 100; ++i) {
- tasking::enqueue(LPC_TEST_HDFS, tracker, [this, i]() {
- // mock the writing process in hdfs_file_object::download().
- std::string test_file_name = local_test_dir + "/test_file_" +
std::to_string(i);
- std::ofstream out(test_file_name, std::ios::binary | std::ios::out
| std::ios::trunc);
- if (out.is_open()) {
- out.write(test_data_str.c_str(), test_data_str.length());
- }
- out.close();
- });
+ if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) ==
0) {
+ // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+ std::cout << "Set hdfs_test.* configs in config-test.ini to enable
hdfs_service_test."
+ << std::endl;
+ return;
}
+
+ auto s = std::make_shared<hdfs_service>();
+ ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node,
FLAGS_test_backup_path}));
+
+ const std::string kRemoteTestPath = "hdfs_client_test";
+ const std::string kRemoteTestRWFile = kRemoteTestPath + "/test_write_file";
+ const std::string kTestBuffer = "write_hello_world_for_test";
+ const int kTestBufferLength = kTestBuffer.size();
+
+ // 1. clean up all old file in remote test directory.
+ printf("clean up all old files.\n");
+ remove_path_response rem_resp;
+ s->remove_path(remove_path_request{kRemoteTestPath, true},
+ LPC_TEST_HDFS,
+ [&rem_resp](const remove_path_response &resp) { rem_resp =
resp; },
+ nullptr)
+ ->wait();
+ ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND ==
rem_resp.err);
+
+ // 2. create file.
+ printf("test write operation.\n");
+ create_file_response cf_resp;
+ s->create_file(create_file_request{kRemoteTestRWFile, false},
+ LPC_TEST_HDFS,
+ [&cf_resp](const create_file_response &r) { cf_resp = r; },
+ nullptr)
+ ->wait();
+ ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
+
+ // 3. write file.
+ dsn::blob bb(kTestBuffer.c_str(), 0, kTestBufferLength);
+ write_response w_resp;
+ cf_resp.file_handle
+ ->write(write_request{bb},
+ LPC_TEST_HDFS,
+ [&w_resp](const write_response &w) { w_resp = w; },
+ nullptr)
+ ->wait();
+ ASSERT_EQ(dsn::ERR_OK, w_resp.err);
+ ASSERT_EQ(kTestBufferLength, w_resp.written_size);
+ ASSERT_EQ(kTestBufferLength, cf_resp.file_handle->get_size());
+
+ // 4. read file.
+ printf("test read just written contents.\n");
+ read_response r_resp;
+ cf_resp.file_handle
+ ->read(read_request{0, -1},
+ LPC_TEST_HDFS,
+ [&r_resp](const read_response &r) { r_resp = r; },
+ nullptr)
+ ->wait();
+ ASSERT_EQ(dsn::ERR_OK, r_resp.err);
+ ASSERT_EQ(kTestBufferLength, r_resp.buffer.length());
+ ASSERT_EQ(kTestBuffer, r_resp.buffer.to_string());
+
+ // 5. partial read.
+ const uint64_t kOffset = 5;
+ const int64_t kSize = 10;
+ cf_resp.file_handle
+ ->read(read_request{kOffset, kSize},
+ LPC_TEST_HDFS,
+ [&r_resp](const read_response &r) { r_resp = r; },
+ nullptr)
+ ->wait();
+ ASSERT_EQ(dsn::ERR_OK, r_resp.err);
+ ASSERT_EQ(kSize, r_resp.buffer.length());
+ ASSERT_EQ(kTestBuffer.substr(kOffset, kSize), r_resp.buffer.to_string());
}
-TEST_F(HDFSClientTest, test_basic_operation)
+TEST_P(HDFSClientTest, test_upload_and_download)
{
- if (name_node == example_name_node || backup_path == example_backup_path) {
+ if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) ==
0) {
+ // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+ std::cout << "Set hdfs_test.* configs in config-test.ini to enable
hdfs_service_test."
+ << std::endl;
return;
}
- std::vector<std::string> args = {name_node, backup_path};
- std::shared_ptr<hdfs_service> s = std::make_shared<hdfs_service>();
- ASSERT_EQ(dsn::ERR_OK, s->initialize(args));
+ auto s = std::make_shared<hdfs_service>();
+ ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node,
FLAGS_test_backup_path}));
- std::string local_test_file = "test_file";
- std::string remote_test_file = "hdfs_client_test/test_file";
- int64_t test_file_size = 0;
+ const std::string kLocalFile = "test_file";
+ const std::string kRemoteTestPath = "hdfs_client_test";
+ const std::string kRemoteTestFile = kRemoteTestPath + "/test_file";
- generate_test_file(local_test_file.c_str());
- dsn::utils::filesystem::file_size(local_test_file, test_file_size);
+ // 0. generate test file.
+ NO_FATALS(generate_test_file(kLocalFile));
+ int64_t local_file_size = 0;
+ ASSERT_TRUE(dsn::utils::filesystem::file_size(
+ kLocalFile, dsn::utils::FileDataType::kSensitive, local_file_size));
+ std::string local_file_md5sum;
+ dsn::utils::filesystem::md5sum(kLocalFile, local_file_md5sum);
- // fisrt clean up all old file in test directory.
+ // 1. clean up all old file in remote test directory.
printf("clean up all old files.\n");
remove_path_response rem_resp;
- s->remove_path(remove_path_request{"hdfs_client_test", true},
+ s->remove_path(remove_path_request{kRemoteTestPath, true},
LPC_TEST_HDFS,
[&rem_resp](const remove_path_response &resp) { rem_resp =
resp; },
nullptr)
->wait();
ASSERT_TRUE(dsn::ERR_OK == rem_resp.err || dsn::ERR_OBJECT_NOT_FOUND ==
rem_resp.err);
- // test upload file.
- printf("create and upload: %s.\n", remote_test_file.c_str());
+ // 2. create file.
+ printf("create and upload: %s.\n", kRemoteTestFile.c_str());
create_file_response cf_resp;
- s->create_file(create_file_request{remote_test_file, true},
+ s->create_file(create_file_request{kRemoteTestFile, true},
LPC_TEST_HDFS,
[&cf_resp](const create_file_response &r) { cf_resp = r; },
nullptr)
->wait();
- ASSERT_EQ(cf_resp.err, dsn::ERR_OK);
+ ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
+
+ // 3. upload file.
upload_response u_resp;
cf_resp.file_handle
- ->upload(upload_request{local_test_file},
+ ->upload(upload_request{kLocalFile},
LPC_TEST_HDFS,
[&u_resp](const upload_response &r) { u_resp = r; },
nullptr)
->wait();
ASSERT_EQ(dsn::ERR_OK, u_resp.err);
- ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size());
+ ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size());
- // test list directory.
+ // 4. list directory.
ls_response l_resp;
- s->list_dir(ls_request{"hdfs_client_test"},
+ s->list_dir(ls_request{kRemoteTestPath},
LPC_TEST_HDFS,
[&l_resp](const ls_response &resp) { l_resp = resp; },
nullptr)
->wait();
ASSERT_EQ(dsn::ERR_OK, l_resp.err);
ASSERT_EQ(1, l_resp.entries->size());
- ASSERT_EQ("test_file", l_resp.entries->at(0).entry_name);
+ ASSERT_EQ(kLocalFile, l_resp.entries->at(0).entry_name);
ASSERT_EQ(false, l_resp.entries->at(0).is_directory);
- // test download file.
+ // 5. download file.
download_response d_resp;
- printf("test download %s.\n", remote_test_file.c_str());
- s->create_file(create_file_request{remote_test_file, false},
+ printf("test download %s.\n", kRemoteTestFile.c_str());
+ s->create_file(create_file_request{kRemoteTestFile, false},
LPC_TEST_HDFS,
[&cf_resp](const create_file_response &resp) { cf_resp =
resp; },
nullptr)
->wait();
ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
- ASSERT_EQ(test_file_size, cf_resp.file_handle->get_size());
- std::string local_file_for_download = "test_file_d";
+ ASSERT_EQ(local_file_size, cf_resp.file_handle->get_size());
+ std::string kLocalDownloadFile = "test_file_d";
cf_resp.file_handle
- ->download(download_request{local_file_for_download, 0, -1},
+ ->download(download_request{kLocalDownloadFile, 0, -1},
LPC_TEST_HDFS,
[&d_resp](const download_response &resp) { d_resp = resp; },
nullptr)
->wait();
ASSERT_EQ(dsn::ERR_OK, d_resp.err);
- ASSERT_EQ(test_file_size, d_resp.downloaded_size);
-
- // compare local_test_file and local_file_for_download.
- int64_t file_size = 0;
- dsn::utils::filesystem::file_size(local_file_for_download, file_size);
- ASSERT_EQ(test_file_size, file_size);
- std::string test_file_md5sum;
- dsn::utils::filesystem::md5sum(local_test_file, test_file_md5sum);
- std::string downloaded_file_md5sum;
- dsn::utils::filesystem::md5sum(local_file_for_download,
downloaded_file_md5sum);
- ASSERT_EQ(test_file_md5sum, downloaded_file_md5sum);
-
- // test read and write.
- printf("test read write operation.\n");
- std::string test_write_file = "hdfs_client_test/test_write_file";
- s->create_file(create_file_request{test_write_file, false},
- LPC_TEST_HDFS,
- [&cf_resp](const create_file_response &r) { cf_resp = r; },
- nullptr)
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, cf_resp.err);
- const char *test_buffer = "write_hello_world_for_test";
- int length = strlen(test_buffer);
- dsn::blob bb(test_buffer, 0, length);
- write_response w_resp;
- cf_resp.file_handle
- ->write(write_request{bb},
- LPC_TEST_HDFS,
- [&w_resp](const write_response &w) { w_resp = w; },
- nullptr)
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, w_resp.err);
- ASSERT_EQ(length, w_resp.written_size);
- ASSERT_EQ(length, cf_resp.file_handle->get_size());
- printf("test read just written contents.\n");
- read_response r_resp;
- cf_resp.file_handle
- ->read(read_request{0, -1},
- LPC_TEST_HDFS,
- [&r_resp](const read_response &r) { r_resp = r; },
- nullptr)
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, r_resp.err);
- ASSERT_EQ(length, r_resp.buffer.length());
- ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer,
length));
-
- // test partitial read.
- cf_resp.file_handle
- ->read(read_request{5, 10},
- LPC_TEST_HDFS,
- [&r_resp](const read_response &r) { r_resp = r; },
- nullptr)
- ->wait();
- ASSERT_EQ(dsn::ERR_OK, r_resp.err);
- ASSERT_EQ(10, r_resp.buffer.length());
- ASSERT_TRUE(dsn::utils::mequals(r_resp.buffer.data(), test_buffer + 5,
10));
-
- // clean up local test files.
- utils::filesystem::remove_path(local_test_file);
- utils::filesystem::remove_path(local_file_for_download);
+ ASSERT_EQ(local_file_size, d_resp.downloaded_size);
+
+ // 6. compare kLocalFile and kLocalDownloadFile.
+ // 6.1 check file size.
+ int64_t local_download_file_size = 0;
+ ASSERT_TRUE(dsn::utils::filesystem::file_size(
+ kLocalDownloadFile, dsn::utils::FileDataType::kSensitive,
local_download_file_size));
+ ASSERT_EQ(local_file_size, local_download_file_size);
+ // 6.2 check file md5sum.
+ std::string local_download_file_md5sum;
+ dsn::utils::filesystem::md5sum(kLocalDownloadFile,
local_download_file_md5sum);
+ ASSERT_EQ(local_file_md5sum, local_download_file_md5sum);
+
+ // 7. clean up local test files.
+ utils::filesystem::remove_path(kLocalFile);
+ utils::filesystem::remove_path(kLocalDownloadFile);
}
-TEST_F(HDFSClientTest, test_concurrent_upload_download)
+TEST_P(HDFSClientTest, test_concurrent_upload_download)
{
- if (name_node == example_name_node || backup_path == example_backup_path) {
+ if (strlen(FLAGS_test_name_node) == 0 || strlen(FLAGS_test_backup_path) ==
0) {
+ // TODO(yingchun): use GTEST_SKIP after upgrading gtest.
+ std::cout << "Set hdfs_test.* configs in config-test.ini to enable
hdfs_service_test."
+ << std::endl;
return;
}
- std::vector<std::string> args = {name_node, backup_path};
- std::shared_ptr<hdfs_service> s = std::make_shared<hdfs_service>();
- ASSERT_EQ(dsn::ERR_OK, s->initialize(args));
+ auto s = std::make_shared<hdfs_service>();
+ ASSERT_EQ(dsn::ERR_OK, s->initialize({FLAGS_test_name_node,
FLAGS_test_backup_path}));
int total_files = FLAGS_num_total_files_for_hdfs_concurrent_test;
std::vector<std::string> local_file_names;
@@ -282,11 +334,12 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
// generate test files.
for (int i = 0; i < total_files; ++i) {
std::string file_name = "randomfile" + std::to_string(i);
- generate_test_file(file_name.c_str());
+ NO_FATALS(generate_test_file(file_name));
int64_t file_size = 0;
- dsn::utils::filesystem::file_size(file_name, file_size);
+ ASSERT_TRUE(dsn::utils::filesystem::file_size(
+ file_name, dsn::utils::FileDataType::kSensitive, file_size));
std::string md5sum;
- dsn::utils::filesystem::md5sum(file_name, md5sum);
+ ASSERT_EQ(ERR_OK, dsn::utils::filesystem::md5sum(file_name, md5sum));
local_file_names.emplace_back(file_name);
remote_file_names.emplace_back("hdfs_concurrent_test/" + file_name);
@@ -385,13 +438,22 @@ TEST_F(HDFSClientTest, test_concurrent_upload_download)
}
}
-TEST_F(HDFSClientTest, test_rename_path_while_writing)
+TEST_P(HDFSClientTest, test_rename_path_while_writing)
{
+ std::string kLocalTestPath = "test_dir";
+ const int kTotalFiles = 100;
+
task_tracker tracker;
- write_test_files_async(&tracker);
+ int success_count = 0;
+ write_test_files_async(kLocalTestPath, kTotalFiles, &success_count,
&tracker);
usleep(100);
- std::string rename_dir = "rename_dir." + std::to_string(dsn_now_ms());
- // rename succeed but writing failed.
- ASSERT_TRUE(dsn::utils::filesystem::rename_path(local_test_dir,
rename_dir));
+
+ std::string kLocalRenamedTestPath = "rename_dir." +
std::to_string(dsn_now_ms());
+ // Rename succeed but some files write failed.
+ ASSERT_TRUE(dsn::utils::filesystem::rename_path(kLocalTestPath,
kLocalRenamedTestPath));
tracker.cancel_outstanding_tasks();
+ // Generally, we can assume partial files are written failed.
+ // It maybe flaky, please retry if it failed.
+ ASSERT_GT(success_count, 0) << success_count;
+ ASSERT_LT(success_count, kTotalFiles) << success_count;
}
diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt
index 7e27ef9b7..d4609d9e5 100644
--- a/src/replica/CMakeLists.txt
+++ b/src/replica/CMakeLists.txt
@@ -75,7 +75,7 @@ set(MY_PROJ_LIBS
PocoFoundation
PocoNetSSL
PocoJSON
- )
+ rocksdb)
set(MY_BOOST_LIBS Boost::filesystem Boost::regex)
diff --git a/src/utils/env.cpp b/src/utils/env.cpp
index 329406117..7e49d999a 100644
--- a/src/utils/env.cpp
+++ b/src/utils/env.cpp
@@ -49,6 +49,12 @@ DSN_DEFINE_string(pegasus.server,
"The encryption method to use in the filesystem. Now "
"supports AES128CTR, AES192CTR, AES256CTR and SM4CTR.");
+DSN_DEFINE_bool(replication,
+ enable_direct_io,
+ false,
+ "Whether to enable direct I/O when download files");
+DSN_TAG_VARIABLE(enable_direct_io, FT_MUTABLE);
+
namespace dsn {
namespace utils {
@@ -89,6 +95,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname,
uint64_t *total_size)
{
rocksdb::EnvOptions src_env_options;
+ src_env_options.use_direct_reads = FLAGS_enable_direct_io;
std::unique_ptr<rocksdb::SequentialFile> src_file;
auto s =
dsn::utils::PegasusEnv(src_type)->NewSequentialFile(src_fname,
&src_file, src_env_options);
@@ -105,6 +112,7 @@ rocksdb::Status do_copy_file(const std::string &src_fname,
}
rocksdb::EnvOptions dst_env_options;
+ dst_env_options.use_direct_writes = FLAGS_enable_direct_io;
std::unique_ptr<rocksdb::WritableFile> dst_file;
s = dsn::utils::PegasusEnv(dst_type)->NewWritableFile(dst_fname,
&dst_file, dst_env_options);
LOG_AND_RETURN_NOT_RDB_OK(WARNING, s, "failed to open file {} for
writing", dst_fname);
diff --git a/src/utils/env.h b/src/utils/env.h
index 839bd81ab..cdaf16aaa 100644
--- a/src/utils/env.h
+++ b/src/utils/env.h
@@ -19,8 +19,8 @@
#include <rocksdb/env.h>
#include <rocksdb/status.h>
-#include <stddef.h>
-#include <stdint.h>
+#include <cstddef>
+#include <cstdint>
#include <string>
namespace dsn {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]