This is an automated email from the ASF dual-hosted git repository.
laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new f1fee10dd refactor(aio_test): refactor aio unit test (#1617)
f1fee10dd is described below
commit f1fee10dd56f3cc6a485a9ff7878b0a31cc6f2e8
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Sep 25 11:28:39 2023 +0800
refactor(aio_test): refactor aio unit test (#1617)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the aio unit test.
The `aio_pwrite_incomplete` fail point injection has been removed, it's
useless in
the following aio refactor.
---
src/aio/native_linux_aio_provider.cpp | 9 -
src/aio/test/aio.cpp | 413 ++++++++++++++++++++++------------
2 files changed, 273 insertions(+), 149 deletions(-)
diff --git a/src/aio/native_linux_aio_provider.cpp
b/src/aio/native_linux_aio_provider.cpp
index 470f3d63a..56a0efa1e 100644
--- a/src/aio/native_linux_aio_provider.cpp
+++ b/src/aio/native_linux_aio_provider.cpp
@@ -35,12 +35,10 @@
#include "aio/disk_engine.h"
#include "runtime/service_engine.h"
#include "runtime/task/async_calls.h"
-#include "utils/fail_point.h"
#include "utils/fmt_logging.h"
#include "utils/latency_tracer.h"
#include "utils/ports.h"
#include "utils/safe_strerror_posix.h"
-#include "utils/string_view.h"
namespace dsn {
@@ -99,13 +97,6 @@ error_code native_linux_aio_provider::write(const
aio_context &aio_ctx,
return resp;
}
- // mock the `ret` to reproduce the `write incomplete` case in the
first write
- FAIL_POINT_INJECT_NOT_RETURN_F("aio_pwrite_incomplete",
[&](string_view s) -> void {
- if (dsn_unlikely(buffer_offset == 0)) {
- --ret;
- }
- });
-
buffer_offset += ret;
if (dsn_unlikely(buffer_offset != aio_ctx.buffer_size)) {
LOG_WARNING(
diff --git a/src/aio/test/aio.cpp b/src/aio/test/aio.cpp
index 703799652..a95cbeb8f 100644
--- a/src/aio/test/aio.cpp
+++ b/src/aio/test/aio.cpp
@@ -24,156 +24,264 @@
* THE SOFTWARE.
*/
-#include <alloca.h>
#include <fcntl.h>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
#include <gtest/gtest.h>
-#include <stdint.h>
+#include <rocksdb/status.h>
#include <string.h>
+#include <algorithm>
+#include <cstdint>
#include <list>
#include <memory>
+#include <random>
#include <string>
+#include <vector>
#include "aio/aio_task.h"
#include "aio/file_io.h"
#include "runtime/task/task_code.h"
#include "runtime/tool_api.h"
+#include "test_util/test_util.h"
#include "utils/autoref_ptr.h"
+#include "utils/env.h"
#include "utils/error_code.h"
-#include "utils/fail_point.h"
#include "utils/filesystem.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
-#include "utils/strings.h"
+#include "utils/test_macros.h"
#include "utils/threadpool_code.h"
-namespace dsn {
-class disk_file;
-} // namespace dsn
-
using namespace ::dsn;
DEFINE_THREAD_POOL_CODE(THREAD_POOL_TEST_SERVER)
DEFINE_TASK_CODE_AIO(LPC_AIO_TEST, TASK_PRIORITY_COMMON,
THREAD_POOL_TEST_SERVER);
-TEST(core, aio)
+class aio_test : public pegasus::encrypt_data_test_base
{
- fail::setup();
- fail::cfg("aio_pwrite_incomplete", "void()");
- const char *buffer = "hello, world";
- int len = (int)strlen(buffer);
+public:
+ void SetUp() override { utils::filesystem::remove_path(kTestFileName); }
- // write
- auto fp = file::open("tmp", O_RDWR | O_CREAT | O_BINARY, 0666);
+ const std::string kTestFileName = "aio_test.txt";
+};
- std::list<aio_task_ptr> tasks;
- uint64_t offset = 0;
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, aio_test, ::testing::Values(false));
- // new write
- for (int i = 0; i < 100; i++) {
- auto t = ::dsn::file::write(fp, buffer, len, offset, LPC_AIO_TEST,
nullptr, nullptr);
- tasks.push_back(t);
- offset += len;
- }
+TEST_P(aio_test, basic)
+{
+ const char *kUnitBuffer = "hello, world";
+ const size_t kUnitBufferLength = strlen(kUnitBuffer);
+ const int kTotalBufferCount = 100;
+ const int kBufferCountPerBatch = 10;
+ const int64_t kFileSize = kUnitBufferLength * kTotalBufferCount;
+ ASSERT_EQ(0, kTotalBufferCount % kBufferCountPerBatch);
+
+ auto check_callback = [kUnitBufferLength](::dsn::error_code err, size_t n)
{
+ // Use CHECK_* instead of ASSERT_* to exit the tests immediately when
error occurs.
+ CHECK_EQ(ERR_OK, err);
+ CHECK_EQ(kUnitBufferLength, n);
+ };
+ auto verify_data = [=]() {
+ int64_t file_size;
+ ASSERT_TRUE(utils::filesystem::file_size(
+ kTestFileName.c_str(), dsn::utils::FileDataType::kSensitive,
file_size));
+ ASSERT_EQ(kFileSize, file_size);
+
+ // Create a read file handler.
+ auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+ ASSERT_NE(rfile, nullptr);
+
+ // 1. Check sequential read.
+ {
+ uint64_t offset = 0;
+ std::list<aio_task_ptr> tasks;
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ char read_buffer[kUnitBufferLength + 1];
+ read_buffer[kUnitBufferLength] = 0;
+ auto t = ::dsn::file::read(rfile,
+ read_buffer,
+ kUnitBufferLength,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ check_callback);
+ offset += kUnitBufferLength;
+
+ t->wait();
+ ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
+ ASSERT_STREQ(kUnitBuffer, read_buffer);
+ }
+ }
- for (auto &t : tasks) {
- t->wait();
- }
+ // 2. Check concurrent read.
+ {
+ uint64_t offset = 0;
+ std::list<aio_task_ptr> tasks;
+ char read_buffers[kTotalBufferCount][kUnitBufferLength + 1];
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ read_buffers[i][kUnitBufferLength] = 0;
+ auto t = ::dsn::file::read(rfile,
+ read_buffers[i],
+ kUnitBufferLength,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ check_callback);
+ offset += kUnitBufferLength;
+ tasks.push_back(t);
+ }
+ for (auto &t : tasks) {
+ t->wait();
+ ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
+ }
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ ASSERT_STREQ(kUnitBuffer, read_buffers[i]);
+ }
+ }
+ ASSERT_EQ(ERR_OK, file::close(rfile));
+ };
- // overwrite
- offset = 0;
- tasks.clear();
- for (int i = 0; i < 100; i++) {
- auto t = ::dsn::file::write(fp, buffer, len, offset, LPC_AIO_TEST,
nullptr, nullptr);
- tasks.push_back(t);
- offset += len;
+ // 1. Sequential write.
+ {
+ auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
+
+ uint64_t offset = 0;
+ std::list<aio_task_ptr> tasks;
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ auto t = ::dsn::file::write(wfile,
+ kUnitBuffer,
+ kUnitBufferLength,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ check_callback);
+ offset += kUnitBufferLength;
+ tasks.push_back(t);
+ }
+ for (auto &t : tasks) {
+ t->wait();
+ ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
+ }
+ ASSERT_EQ(ERR_OK, file::flush(wfile));
+ ASSERT_EQ(ERR_OK, file::close(wfile));
}
+ NO_FATALS(verify_data());
- for (auto &t : tasks) {
- t->wait();
- EXPECT_TRUE(t->get_transferred_size() == (size_t)len);
- }
+ // 2. Un-sequential write.
+ {
+ auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
- // vector write
- tasks.clear();
- std::unique_ptr<dsn_file_buffer_t[]> buffers(new dsn_file_buffer_t[100]);
- for (int i = 0; i < 10; i++) {
- buffers[i].buffer = static_cast<void *>(const_cast<char *>(buffer));
- buffers[i].size = len;
- }
- for (int i = 0; i < 10; i++) {
- tasks.push_back(::dsn::file::write_vector(
- fp, buffers.get(), 10, offset, LPC_AIO_TEST, nullptr, nullptr));
- offset += 10 * len;
- }
- for (auto &t : tasks) {
- t->wait();
- EXPECT_TRUE(t->get_transferred_size() == 10 * len);
- }
- auto err = file::close(fp);
- EXPECT_TRUE(err == ERR_OK);
-
- // read
- char *buffer2 = (char *)alloca((size_t)len);
- fp = file::open("tmp", O_RDONLY | O_BINARY, 0);
-
- // concurrent read
- offset = 0;
- tasks.clear();
- for (int i = 0; i < 100; i++) {
- auto t = ::dsn::file::read(fp, buffer2, len, offset, LPC_AIO_TEST,
nullptr, nullptr);
- tasks.push_back(t);
- offset += len;
- }
+ std::vector<uint64_t> offsets;
+ offsets.reserve(kTotalBufferCount);
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ offsets.push_back(i * kUnitBufferLength);
+ }
- for (auto &t : tasks) {
- t->wait();
- EXPECT_TRUE(t->get_transferred_size() == (size_t)len);
- }
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ std::shuffle(offsets.begin(), offsets.end(), gen);
- // sequential read
- offset = 0;
- tasks.clear();
- for (int i = 0; i < 200; i++) {
- buffer2[0] = 'x';
- auto t = ::dsn::file::read(fp, buffer2, len, offset, LPC_AIO_TEST,
nullptr, nullptr);
- offset += len;
-
- t->wait();
- EXPECT_TRUE(t->get_transferred_size() == (size_t)len);
- EXPECT_TRUE(dsn::utils::mequals(buffer, buffer2, len));
+ std::list<aio_task_ptr> tasks;
+ for (const auto &offset : offsets) {
+ auto t = ::dsn::file::write(wfile,
+ kUnitBuffer,
+ kUnitBufferLength,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ check_callback);
+ tasks.push_back(t);
+ }
+ for (auto &t : tasks) {
+ t->wait();
+ ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
+ }
+ ASSERT_EQ(ERR_OK, file::flush(wfile));
+ ASSERT_EQ(ERR_OK, file::close(wfile));
}
-
- err = file::close(fp);
- fail::teardown();
- EXPECT_TRUE(err == ERR_OK);
-
- utils::filesystem::remove_path("tmp");
+ NO_FATALS(verify_data());
+
+ // 3. Overwrite.
+ {
+ auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
+
+ uint64_t offset = 0;
+ std::list<aio_task_ptr> tasks;
+ for (int i = 0; i < kTotalBufferCount; i++) {
+ auto t = ::dsn::file::write(wfile,
+ kUnitBuffer,
+ kUnitBufferLength,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ check_callback);
+ offset += kUnitBufferLength;
+ tasks.push_back(t);
+ }
+ for (auto &t : tasks) {
+ t->wait();
+ ASSERT_EQ(kUnitBufferLength, t->get_transferred_size());
+ }
+ ASSERT_EQ(ERR_OK, file::flush(wfile));
+ ASSERT_EQ(ERR_OK, file::close(wfile));
+ }
+ NO_FATALS(verify_data());
+
+ // 4. Vector write.
+ {
+ auto wfile = file::open(kTestFileName.c_str(), O_RDWR | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
+
+ uint64_t offset = 0;
+ std::list<aio_task_ptr> tasks;
+ std::unique_ptr<dsn_file_buffer_t[]> buffers(new
dsn_file_buffer_t[kBufferCountPerBatch]);
+ for (int i = 0; i < kBufferCountPerBatch; i++) {
+ buffers[i].buffer = static_cast<void *>(const_cast<char
*>(kUnitBuffer));
+ buffers[i].size = kUnitBufferLength;
+ }
+ for (int i = 0; i < kTotalBufferCount / kBufferCountPerBatch; i++) {
+ tasks.push_back(
+ ::dsn::file::write_vector(wfile,
+ buffers.get(),
+ kBufferCountPerBatch,
+ offset,
+ LPC_AIO_TEST,
+ nullptr,
+ [=](::dsn::error_code err, size_t n)
{
+ CHECK_EQ(ERR_OK, err);
+ CHECK_EQ(kBufferCountPerBatch *
kUnitBufferLength, n);
+ }));
+ offset += kBufferCountPerBatch * kUnitBufferLength;
+ }
+ for (auto &t : tasks) {
+ t->wait();
+ ASSERT_EQ(kBufferCountPerBatch * kUnitBufferLength,
t->get_transferred_size());
+ }
+ ASSERT_EQ(ERR_OK, file::flush(wfile));
+ ASSERT_EQ(ERR_OK, file::close(wfile));
+ }
+ NO_FATALS(verify_data());
}
-TEST(core, aio_share)
+TEST_P(aio_test, aio_share)
{
- auto fp = file::open("tmp", O_WRONLY | O_CREAT | O_BINARY, 0666);
- EXPECT_TRUE(fp != nullptr);
+ auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
- auto fp2 = file::open("tmp", O_RDONLY | O_BINARY, 0);
- EXPECT_TRUE(fp2 != nullptr);
+ auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+ ASSERT_NE(rfile, nullptr);
- file::close(fp);
- file::close(fp2);
-
- utils::filesystem::remove_path("tmp");
+ ASSERT_EQ(ERR_OK, file::close(wfile));
+ ASSERT_EQ(ERR_OK, file::close(rfile));
}
-TEST(core, operation_failed)
+TEST_P(aio_test, operation_failed)
{
- fail::setup();
- fail::cfg("aio_pwrite_incomplete", "void()");
-
- auto fp = file::open("tmp_test_file", O_WRONLY, 0600);
- EXPECT_TRUE(fp == nullptr);
-
auto err = std::make_unique<dsn::error_code>();
auto count = std::make_unique<size_t>();
auto io_callback = [&err, &count](::dsn::error_code e, size_t n) {
@@ -181,39 +289,42 @@ TEST(core, operation_failed)
*count = n;
};
- fp = file::open("tmp_test_file", O_WRONLY | O_CREAT | O_BINARY, 0666);
- EXPECT_TRUE(fp != nullptr);
- char buffer[512];
- const char *str = "hello file";
- auto t = ::dsn::file::write(fp, str, strlen(str), 0, LPC_AIO_TEST,
nullptr, io_callback, 0);
+ auto wfile = file::open(kTestFileName.c_str(), O_WRONLY | O_CREAT |
O_BINARY, 0666);
+ ASSERT_NE(wfile, nullptr);
+
+ char buff[512] = {0};
+ const char *kUnitBuffer = "hello file";
+ const size_t kUnitBufferLength = strlen(kUnitBuffer);
+ auto t = ::dsn::file::write(
+ wfile, kUnitBuffer, kUnitBufferLength, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
t->wait();
- EXPECT_TRUE(*err == ERR_OK && *count == strlen(str));
+ ASSERT_EQ(ERR_OK, *err);
+ ASSERT_EQ(kUnitBufferLength, *count);
- t = ::dsn::file::read(fp, buffer, 512, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
+ t = ::dsn::file::read(wfile, buff, 512, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
t->wait();
- EXPECT_TRUE(*err == ERR_FILE_OPERATION_FAILED);
+ ASSERT_EQ(ERR_FILE_OPERATION_FAILED, *err);
- auto fp2 = file::open("tmp_test_file", O_RDONLY | O_BINARY, 0);
- EXPECT_TRUE(fp2 != nullptr);
+ auto rfile = file::open(kTestFileName.c_str(), O_RDONLY | O_BINARY, 0);
+ ASSERT_NE(nullptr, rfile);
- t = ::dsn::file::read(fp2, buffer, 512, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
+ t = ::dsn::file::read(rfile, buff, 512, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
t->wait();
- EXPECT_TRUE(*err == ERR_OK && *count == strlen(str));
- EXPECT_TRUE(dsn::utils::equals(buffer, str, 10));
+ ASSERT_EQ(ERR_OK, *err);
+ ASSERT_EQ(kUnitBufferLength, *count);
+ ASSERT_STREQ(kUnitBuffer, buff);
- t = ::dsn::file::read(fp2, buffer, 5, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
+ t = ::dsn::file::read(rfile, buff, 5, 0, LPC_AIO_TEST, nullptr,
io_callback, 0);
t->wait();
- EXPECT_TRUE(*err == ERR_OK && *count == 5);
- EXPECT_TRUE(dsn::utils::equals(buffer, str, 5));
+ ASSERT_EQ(ERR_OK, *err);
+ ASSERT_EQ(5, *count);
+ ASSERT_STREQ(kUnitBuffer, buff);
- t = ::dsn::file::read(fp2, buffer, 512, 100, LPC_AIO_TEST, nullptr,
io_callback, 0);
+ t = ::dsn::file::read(rfile, buff, 512, 100, LPC_AIO_TEST, nullptr,
io_callback, 0);
t->wait();
- LOG_INFO("error code: {}", *err);
- file::close(fp);
- file::close(fp2);
- fail::teardown();
-
- EXPECT_TRUE(utils::filesystem::remove_path("tmp_test_file"));
+ ASSERT_EQ(ERR_HANDLE_EOF, *err);
+ ASSERT_EQ(ERR_OK, file::close(wfile));
+ ASSERT_EQ(ERR_OK, file::close(rfile));
}
DEFINE_TASK_CODE_AIO(LPC_AIO_TEST_READ, TASK_PRIORITY_COMMON,
THREAD_POOL_DEFAULT)
@@ -223,22 +334,39 @@ struct aio_result
dsn::error_code err;
size_t sz;
};
-TEST(core, dsn_file)
+
+TEST_P(aio_test, dsn_file)
{
- int64_t fin_size, fout_size;
- ASSERT_TRUE(utils::filesystem::file_size("copy_source.txt", fin_size));
- ASSERT_LT(0, fin_size);
+ std::string src_file = "copy_source.txt";
+ std::string dst_file = "copy_dest.txt";
+ if (FLAGS_encrypt_data_at_rest) {
+ auto s = dsn::utils::encrypt_file(src_file, src_file + ".encrypted");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ src_file += ".encrypted";
+
+ s = dsn::utils::encrypt_file(dst_file, dst_file + ".encrypted");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ dst_file += ".encrypted";
+ }
+
+ int64_t src_file_size;
+ ASSERT_TRUE(utils::filesystem::file_size(
+ src_file, dsn::utils::FileDataType::kSensitive, src_file_size));
+ ASSERT_LT(0, src_file_size);
+ std::string src_file_md5;
+ ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(src_file, src_file_md5));
+ ASSERT_FALSE(src_file_md5.empty());
- dsn::disk_file *fin = file::open("copy_source.txt", O_RDONLY, 0);
+ auto fin = file::open(src_file.c_str(), O_RDONLY | O_BINARY, 0);
ASSERT_NE(nullptr, fin);
- dsn::disk_file *fout = file::open("copy_dest.txt", O_RDWR | O_CREAT |
O_TRUNC, 0666);
+ auto fout = file::open(dst_file.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
ASSERT_NE(nullptr, fout);
- char buffer[1024];
+ char kUnitBuffer[1024];
uint64_t offset = 0;
while (true) {
aio_result rin;
aio_task_ptr tin = file::read(fin,
- buffer,
+ kUnitBuffer,
1024,
offset,
LPC_AIO_TEST_READ,
@@ -270,7 +398,7 @@ TEST(core, dsn_file)
aio_result rout;
aio_task_ptr tout = file::write(fout,
- buffer,
+ kUnitBuffer,
rin.sz,
offset,
LPC_AIO_TEST_WRITE,
@@ -296,10 +424,15 @@ TEST(core, dsn_file)
offset += rin.sz;
}
- ASSERT_EQ((uint64_t)fin_size, offset);
+ ASSERT_EQ(static_cast<uint64_t>(src_file_size), offset);
ASSERT_EQ(ERR_OK, file::close(fout));
ASSERT_EQ(ERR_OK, file::close(fin));
- ASSERT_TRUE(utils::filesystem::file_size("copy_dest.txt", fout_size));
- ASSERT_EQ(fin_size, fout_size);
+ int64_t dst_file_size;
+ ASSERT_TRUE(utils::filesystem::file_size(
+ dst_file, dsn::utils::FileDataType::kSensitive, dst_file_size));
+ ASSERT_EQ(src_file_size, dst_file_size);
+ std::string dst_file_md5;
+ ASSERT_EQ(ERR_OK, utils::filesystem::md5sum(src_file, dst_file_md5));
+ ASSERT_EQ(src_file_md5, dst_file_md5);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]