This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a2f2ebd3d refactor(simple_meta_state): use rocksdb API to read/write
file (#1620)
a2f2ebd3d is described below
commit a2f2ebd3d2b5f2075497ea3f3bd18bf8fe411578
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Sep 26 04:25:22 2023 -0500
refactor(simple_meta_state): use rocksdb API to read/write file (#1620)
https://github.com/apache/incubator-pegasus/issues/887
There is no functional changes, but only refactor the
meta_state_service_simple and
related unit test.
---
src/meta/distributed_lock_service_simple.h | 1 +
src/meta/dump_file.h | 2 +
src/meta/meta_state_service_simple.cpp | 123 ++++++++++++++----------
src/meta/meta_state_service_simple.h | 1 +
src/meta/test/meta_state/meta_state_service.cpp | 88 ++++++++++-------
5 files changed, 131 insertions(+), 84 deletions(-)
diff --git a/src/meta/distributed_lock_service_simple.h
b/src/meta/distributed_lock_service_simple.h
index a2da463b4..59b2bde5e 100644
--- a/src/meta/distributed_lock_service_simple.h
+++ b/src/meta/distributed_lock_service_simple.h
@@ -53,6 +53,7 @@
namespace dsn {
namespace dist {
+// Only for test purpose.
class distributed_lock_service_simple : public distributed_lock_service
{
public:
diff --git a/src/meta/dump_file.h b/src/meta/dump_file.h
index 4e8018ca5..062ffd1fc 100644
--- a/src/meta/dump_file.h
+++ b/src/meta/dump_file.h
@@ -63,6 +63,8 @@ struct block_header
uint32_t crc32;
};
+// TODO(yingchun): use rocksdb APIs to unify the file operations.
+// A tool to dump app_states of meta server to local file, used by remote
command "meta.dump".
class dump_file
{
public:
diff --git a/src/meta/meta_state_service_simple.cpp
b/src/meta/meta_state_service_simple.cpp
index f2db0da0a..6ba00176e 100644
--- a/src/meta/meta_state_service_simple.cpp
+++ b/src/meta/meta_state_service_simple.cpp
@@ -27,7 +27,6 @@
#include "meta_state_service_simple.h"
#include <fcntl.h>
-#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <set>
@@ -35,6 +34,9 @@
#include <utility>
#include "aio/file_io.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
#include "runtime/service_app.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
@@ -228,7 +230,7 @@ error_code meta_state_service_simple::apply_transaction(
default:
CHECK(false, "unsupported operation");
}
- CHECK_EQ_MSG(ec, ERR_OK, "unexpected error when applying");
+ CHECK_EQ_MSG(ERR_OK, ec, "unexpected error when applying");
}
return ERR_OK;
@@ -242,53 +244,73 @@ error_code meta_state_service_simple::initialize(const
std::vector<std::string>
_offset = 0;
std::string log_path = dsn::utils::filesystem::path_combine(work_dir,
"meta_state_service.log");
if (utils::filesystem::file_exists(log_path)) {
- if (FILE *fd = fopen(log_path.c_str(), "rb")) {
- for (;;) {
- log_header header;
- if (fread(&header, sizeof(log_header), 1, fd) != 1) {
- break;
- }
- if (header.magic != log_header::default_magic) {
- break;
- }
- std::shared_ptr<char>
buffer(dsn::utils::make_shared_array<char>(header.size));
- if (fread(buffer.get(), header.size, 1, fd) != 1) {
- break;
- }
- _offset += sizeof(header) + header.size;
- binary_reader reader(blob(buffer, (int)header.size));
- int op_type = 0;
- reader.read(op_type);
-
- switch (static_cast<operation_type>(op_type)) {
- case operation_type::create_node: {
- std::string node;
- blob data;
- create_node_log::parse(reader, node, data);
- create_node_internal(node, data);
- break;
- }
- case operation_type::delete_node: {
- std::string node;
- bool recursively_delete;
- delete_node_log::parse(reader, node, recursively_delete);
- delete_node_internal(node, recursively_delete);
- break;
- }
- case operation_type::set_data: {
- std::string node;
- blob data;
- set_data_log::parse(reader, node, data);
- set_data_internal(node, data);
- break;
- }
- default:
- // The log is complete but its content is modified by
cosmic ray. This is
- // unacceptable
- CHECK(false, "meta state server log corrupted");
- }
+ std::unique_ptr<rocksdb::SequentialFile> log_file;
+ auto s =
+ rocksdb::Env::Default()->NewSequentialFile(log_path, &log_file,
rocksdb::EnvOptions());
+ CHECK(s.ok(), "open log file '{}' failed, err = {}", log_path,
s.ToString());
+
+ while (true) {
+ static const int kLogHeaderSize = sizeof(log_header);
+ static const int kDefaultMagic = 0xdeadbeef;
+ rocksdb::Slice result;
+
+ // Read header.
+ char scratch[kLogHeaderSize] = {0};
+ s = log_file->PositionedRead(_offset, kLogHeaderSize, &result,
scratch);
+ CHECK(s.ok(), "read log file '{}' header failed, err = {}",
log_path, s.ToString());
+ if (result.empty()) {
+ LOG_INFO("read EOF of log file '{}'", log_path);
+ break;
+ }
+ log_header *header = reinterpret_cast<log_header *>(scratch);
+ if (header->magic != kDefaultMagic) {
+ LOG_WARNING("read log file '{}' header with bad magic {}, skip
the left!",
+ log_path,
+ header->magic);
+ break;
+ }
+ _offset += kLogHeaderSize;
+
+ // Read body.
+ std::shared_ptr<char>
buffer(dsn::utils::make_shared_array<char>(header->size));
+ s = log_file->PositionedRead(_offset, header->size, &result,
buffer.get());
+ CHECK(s.ok(),
+ "read log file '{}' header with bad body, err = {}",
+ log_path,
+ s.ToString());
+ _offset += header->size;
+
+ binary_reader reader(blob(buffer, header->size));
+ int op_type = 0;
+ CHECK_EQ(sizeof(op_type), reader.read(op_type));
+
+ switch (static_cast<operation_type>(op_type)) {
+ case operation_type::create_node: {
+ std::string node;
+ blob data;
+ create_node_log::parse(reader, node, data);
+ create_node_internal(node, data);
+ break;
+ }
+ case operation_type::delete_node: {
+ std::string node;
+ bool recursively_delete;
+ delete_node_log::parse(reader, node, recursively_delete);
+ delete_node_internal(node, recursively_delete);
+ break;
+ }
+ case operation_type::set_data: {
+ std::string node;
+ blob data;
+ set_data_log::parse(reader, node, data);
+ set_data_internal(node, data);
+ break;
+ }
+ default:
+ // The log is complete but its content is modified by cosmic
ray. This is
+ // unacceptable
+ CHECK(false, "meta state server log corrupted");
}
- fclose(fd);
}
}
@@ -507,8 +529,9 @@ task_ptr meta_state_service_simple::get_children(const
std::string &node,
meta_state_service_simple::~meta_state_service_simple()
{
- _tracker.cancel_outstanding_tasks();
- file::close(_log);
+ _tracker.wait_outstanding_tasks();
+ CHECK_EQ(ERR_OK, file::flush(_log));
+ CHECK_EQ(ERR_OK, file::close(_log));
for (const auto &kv : _quick_map) {
if ("/" != kv.first) {
diff --git a/src/meta/meta_state_service_simple.h
b/src/meta/meta_state_service_simple.h
index f26e33e17..697b5b159 100644
--- a/src/meta/meta_state_service_simple.h
+++ b/src/meta/meta_state_service_simple.h
@@ -66,6 +66,7 @@ DEFINE_TASK_CODE_AIO(LPC_META_STATE_SERVICE_SIMPLE_INTERNAL,
TASK_PRIORITY_HIGH,
THREAD_POOL_DEFAULT);
+// Only for test purpose.
class meta_state_service_simple : public meta_state_service
{
public:
diff --git a/src/meta/test/meta_state/meta_state_service.cpp
b/src/meta/test/meta_state/meta_state_service.cpp
index a286c5067..4959670b2 100644
--- a/src/meta/test/meta_state/meta_state_service.cpp
+++ b/src/meta/test/meta_state/meta_state_service.cpp
@@ -27,6 +27,7 @@
#include "meta/meta_state_service.h"
#include <boost/lexical_cast.hpp>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
// IWYU pragma: no_include <ext/alloc_traits.h>
// IWYU pragma: no_include <gtest/gtest-message.h>
// IWYU pragma: no_include <gtest/gtest-test-part.h>
@@ -36,12 +37,18 @@
#include "meta/meta_state_service_simple.h"
#include "meta/meta_state_service_zookeeper.h"
+#include "runtime/service_app.h"
#include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
#include "utils/binary_reader.h"
#include "utils/binary_writer.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/threadpool_code.h"
+DSN_DECLARE_bool(encrypt_data_at_rest);
+
using namespace dsn;
using namespace dsn::dist;
@@ -50,8 +57,8 @@ DEFINE_TASK_CODE(META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
TASK_PRIORITY_HIGH, TH
typedef std::function<meta_state_service *()> service_creator_func;
typedef std::function<void(meta_state_service *)> service_deleter_func;
-#define expect_ok [](error_code ec) { EXPECT_TRUE(ec == ERR_OK); }
-#define expect_err [](error_code ec) { EXPECT_FALSE(ec == ERR_OK); }
+#define expect_ok [](error_code ec) { CHECK_EQ(ERR_OK, ec); }
+#define expect_err [](error_code ec) { CHECK_NE(ERR_OK, ec); }
void provider_basic_test(const service_creator_func &service_creator,
const service_deleter_func &service_deleter)
@@ -70,9 +77,9 @@ void provider_basic_test(const service_creator_func
&service_creator,
service->get_children("/1",
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
[](error_code ec, const std::vector<std::string>
&children) {
- CHECK(ec == ERR_OK && children.size() == 1 &&
- *children.begin() == "1",
- "unexpected child");
+ CHECK_EQ(ERR_OK, ec);
+ CHECK_EQ(1, children.size());
+ CHECK_EQ("1", *children.begin());
});
service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
expect_ok)->wait();
service->delete_node("/1", false,
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
@@ -107,11 +114,11 @@ void provider_basic_test(const service_creator_func
&service_creator,
->get_data("/1",
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
[](error_code ec, const dsn::blob &value) {
- expect_ok(ec);
+ CHECK_EQ(ERR_OK, ec);
dsn::binary_reader reader(value);
int read_value = 0;
reader.read(read_value);
- CHECK_EQ(read_value, 0xdeadbeef);
+ CHECK_EQ(0xdeadbeef, read_value);
})
->wait();
writer = dsn::binary_writer();
@@ -124,27 +131,26 @@ void provider_basic_test(const service_creator_func
&service_creator,
->get_data("/1",
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
[](error_code ec, const dsn::blob &value) {
- expect_ok(ec);
+ CHECK_EQ(ERR_OK, ec);
dsn::binary_reader reader(value);
int read_value = 0;
reader.read(read_value);
- CHECK_EQ(read_value, 0xbeefdead);
+ CHECK_EQ(0xbeefdead, read_value);
})
->wait();
}
- // clean the node created in previos code-block, to support test in next
round
+ // clean the node created in previous code-block, to support test in next
round
{
service->delete_node("/1", false,
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
->wait();
}
- typedef dsn::dist::meta_state_service::transaction_entries TEntries;
// transaction op
{
// basic
dsn::binary_writer writer;
writer.write(0xdeadbeef);
- std::shared_ptr<TEntries> entries =
service->new_transaction_entries(5);
+ auto entries = service->new_transaction_entries(5);
entries->create_node("/2");
entries->create_node("/2/2");
entries->create_node("/2/3");
@@ -155,11 +161,11 @@ void provider_basic_test(const service_creator_func
&service_creator,
entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok);
tsk->wait();
for (unsigned int i = 0; i < 5; ++i) {
- EXPECT_TRUE(entries->get_result(i) == ERR_OK);
+ ASSERT_EQ(ERR_OK, entries->get_result(i));
}
// an invalid operation will stop whole transaction
- entries = service->new_transaction_entries(5);
+ entries = service->new_transaction_entries(4);
entries->create_node("/3");
entries->create_node("/4");
entries->delete_node("/2"); // delete a non empty dir
@@ -168,11 +174,12 @@ void provider_basic_test(const service_creator_func
&service_creator,
service->submit_transaction(entries,
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
->wait();
error_code err[4] = {ERR_OK, ERR_OK, ERR_INVALID_PARAMETERS,
ERR_INCONSISTENT_STATE};
- for (unsigned int i = 0; i < 4; ++i)
- EXPECT_EQ(err[i], entries->get_result(i));
+ for (unsigned int i = 0; i < 4; ++i) {
+ ASSERT_EQ(err[i], entries->get_result(i));
+ }
// another invalid transaction
- entries = service->new_transaction_entries(5);
+ entries = service->new_transaction_entries(4);
entries->create_node("/3");
entries->create_node("/4");
entries->delete_node("/5"); // delete a non exist dir
@@ -182,8 +189,9 @@ void provider_basic_test(const service_creator_func
&service_creator,
err[2] = ERR_OBJECT_NOT_FOUND;
service->submit_transaction(entries,
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
->wait();
- for (unsigned int i = 0; i < 4; ++i)
- EXPECT_EQ(err[i], entries->get_result(i));
+ for (unsigned int i = 0; i < 4; ++i) {
+ ASSERT_EQ(err[i], entries->get_result(i));
+ }
}
// check replay with transaction
@@ -195,7 +203,9 @@ void provider_basic_test(const service_creator_func
&service_creator,
->get_children("/2",
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
[](error_code ec, const std::vector<std::string>
&children) {
- ASSERT_TRUE(children.size() == 1 && children[0]
== "2");
+ CHECK_EQ(ERR_OK, ec);
+ CHECK_EQ(1, children.size());
+ CHECK_EQ("2", children[0]);
})
->wait();
@@ -203,27 +213,27 @@ void provider_basic_test(const service_creator_func
&service_creator,
->get_data("/2",
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
[](error_code ec, const blob &value) {
- ASSERT_TRUE(ec == ERR_OK);
+ CHECK_EQ(ERR_OK, ec);
binary_reader reader(value);
int content_value;
reader.read(content_value);
- ASSERT_TRUE(content_value == 0xdeadbeef);
+ CHECK_EQ(0xdeadbeef, content_value);
})
->wait();
}
// delete the nodes created just now, using transaction delete
{
- std::shared_ptr<TEntries> entries =
service->new_transaction_entries(2);
+ auto entries = service->new_transaction_entries(2);
entries->delete_node("/2/2");
entries->delete_node("/2");
service->submit_transaction(entries,
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
->wait();
- error_code err[2] = {ERR_OK, ERR_OK};
- for (unsigned int i = 0; i < 2; ++i)
- EXPECT_EQ(err[i], entries->get_result(i));
+ for (unsigned int i = 0; i < 2; ++i) {
+ ASSERT_EQ(ERR_OK, entries->get_result(i));
+ }
}
service_deleter(service);
@@ -235,7 +245,7 @@ void recursively_create_node_callback(meta_state_service
*service,
int current_layer,
error_code ec)
{
- ASSERT_TRUE(ec == ERR_OK);
+ ASSERT_EQ(ERR_OK, ec);
if (current_layer <= 0)
return;
@@ -279,31 +289,41 @@ void provider_recursively_create_delete_test(const
service_creator_func &creator
deleter(service);
}
-#undef expect_ok
-#undef expect_err
+class meta_state_service_test : public pegasus::encrypt_data_test_base
+{
+};
+
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, meta_state_service_test, ::testing::Values(false));
-TEST(meta_state_service, simple)
+TEST_P(meta_state_service_test, simple)
{
auto simple_service_creator = [] {
meta_state_service_simple *svc = new meta_state_service_simple();
- svc->initialize({});
+ auto err = svc->initialize({});
+ CHECK_EQ(ERR_OK, err);
return svc;
};
auto simple_service_deleter = [](meta_state_service *simple_svc) { delete
simple_svc; };
provider_basic_test(simple_service_creator, simple_service_deleter);
provider_recursively_create_delete_test(simple_service_creator,
simple_service_deleter);
+
+ std::string log_path = dsn::utils::filesystem::path_combine(
+ service_app::current_service_app_info().data_dir,
"meta_state_service.log");
+ ASSERT_TRUE(dsn::utils::filesystem::remove_path(log_path));
}
-TEST(meta_state_service, zookeeper)
+TEST_P(meta_state_service_test, zookeeper)
{
auto zookeeper_service_creator = [] {
meta_state_service_zookeeper *svc = new meta_state_service_zookeeper();
- svc->initialize({});
+ auto err = svc->initialize({});
+ CHECK_EQ(ERR_OK, err);
return svc;
};
auto zookeeper_service_deleter = [](meta_state_service *zookeeper_svc) {
- ASSERT_EQ(zookeeper_svc->finalize(), ERR_OK);
+ ASSERT_EQ(ERR_OK, zookeeper_svc->finalize());
};
provider_basic_test(zookeeper_service_creator, zookeeper_service_deleter);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]