This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a2f2ebd3d refactor(simple_meta_state): use rocksdb API to read/write 
file (#1620)
a2f2ebd3d is described below

commit a2f2ebd3d2b5f2075497ea3f3bd18bf8fe411578
Author: Yingchun Lai <[email protected]>
AuthorDate: Tue Sep 26 04:25:22 2023 -0500

    refactor(simple_meta_state): use rocksdb API to read/write file (#1620)
    
    https://github.com/apache/incubator-pegasus/issues/887
    
    There is no functional changes, but only refactor the 
meta_state_service_simple and
    related unit test.
---
 src/meta/distributed_lock_service_simple.h      |   1 +
 src/meta/dump_file.h                            |   2 +
 src/meta/meta_state_service_simple.cpp          | 123 ++++++++++++++----------
 src/meta/meta_state_service_simple.h            |   1 +
 src/meta/test/meta_state/meta_state_service.cpp |  88 ++++++++++-------
 5 files changed, 131 insertions(+), 84 deletions(-)

diff --git a/src/meta/distributed_lock_service_simple.h 
b/src/meta/distributed_lock_service_simple.h
index a2da463b4..59b2bde5e 100644
--- a/src/meta/distributed_lock_service_simple.h
+++ b/src/meta/distributed_lock_service_simple.h
@@ -53,6 +53,7 @@
 
 namespace dsn {
 namespace dist {
+// Only for test purpose.
 class distributed_lock_service_simple : public distributed_lock_service
 {
 public:
diff --git a/src/meta/dump_file.h b/src/meta/dump_file.h
index 4e8018ca5..062ffd1fc 100644
--- a/src/meta/dump_file.h
+++ b/src/meta/dump_file.h
@@ -63,6 +63,8 @@ struct block_header
     uint32_t crc32;
 };
 
+// TODO(yingchun): use rocksdb APIs to unify the file operations.
+// A tool to dump app_states of meta server to local file, used by remote 
command "meta.dump".
 class dump_file
 {
 public:
diff --git a/src/meta/meta_state_service_simple.cpp 
b/src/meta/meta_state_service_simple.cpp
index f2db0da0a..6ba00176e 100644
--- a/src/meta/meta_state_service_simple.cpp
+++ b/src/meta/meta_state_service_simple.cpp
@@ -27,7 +27,6 @@
 #include "meta_state_service_simple.h"
 
 #include <fcntl.h>
-#include <stdio.h>
 #include <string.h>
 #include <algorithm>
 #include <set>
@@ -35,6 +34,9 @@
 #include <utility>
 
 #include "aio/file_io.h"
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
 #include "runtime/service_app.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task.h"
@@ -228,7 +230,7 @@ error_code meta_state_service_simple::apply_transaction(
         default:
             CHECK(false, "unsupported operation");
         }
-        CHECK_EQ_MSG(ec, ERR_OK, "unexpected error when applying");
+        CHECK_EQ_MSG(ERR_OK, ec, "unexpected error when applying");
     }
 
     return ERR_OK;
@@ -242,53 +244,73 @@ error_code meta_state_service_simple::initialize(const 
std::vector<std::string>
     _offset = 0;
     std::string log_path = dsn::utils::filesystem::path_combine(work_dir, 
"meta_state_service.log");
     if (utils::filesystem::file_exists(log_path)) {
-        if (FILE *fd = fopen(log_path.c_str(), "rb")) {
-            for (;;) {
-                log_header header;
-                if (fread(&header, sizeof(log_header), 1, fd) != 1) {
-                    break;
-                }
-                if (header.magic != log_header::default_magic) {
-                    break;
-                }
-                std::shared_ptr<char> 
buffer(dsn::utils::make_shared_array<char>(header.size));
-                if (fread(buffer.get(), header.size, 1, fd) != 1) {
-                    break;
-                }
-                _offset += sizeof(header) + header.size;
-                binary_reader reader(blob(buffer, (int)header.size));
-                int op_type = 0;
-                reader.read(op_type);
-
-                switch (static_cast<operation_type>(op_type)) {
-                case operation_type::create_node: {
-                    std::string node;
-                    blob data;
-                    create_node_log::parse(reader, node, data);
-                    create_node_internal(node, data);
-                    break;
-                }
-                case operation_type::delete_node: {
-                    std::string node;
-                    bool recursively_delete;
-                    delete_node_log::parse(reader, node, recursively_delete);
-                    delete_node_internal(node, recursively_delete);
-                    break;
-                }
-                case operation_type::set_data: {
-                    std::string node;
-                    blob data;
-                    set_data_log::parse(reader, node, data);
-                    set_data_internal(node, data);
-                    break;
-                }
-                default:
-                    // The log is complete but its content is modified by 
cosmic ray. This is
-                    // unacceptable
-                    CHECK(false, "meta state server log corrupted");
-                }
+        std::unique_ptr<rocksdb::SequentialFile> log_file;
+        auto s =
+            rocksdb::Env::Default()->NewSequentialFile(log_path, &log_file, 
rocksdb::EnvOptions());
+        CHECK(s.ok(), "open log file '{}' failed, err = {}", log_path, 
s.ToString());
+
+        while (true) {
+            static const int kLogHeaderSize = sizeof(log_header);
+            static const int kDefaultMagic = 0xdeadbeef;
+            rocksdb::Slice result;
+
+            // Read header.
+            char scratch[kLogHeaderSize] = {0};
+            s = log_file->PositionedRead(_offset, kLogHeaderSize, &result, 
scratch);
+            CHECK(s.ok(), "read log file '{}' header failed, err = {}", 
log_path, s.ToString());
+            if (result.empty()) {
+                LOG_INFO("read EOF of log file '{}'", log_path);
+                break;
+            }
+            log_header *header = reinterpret_cast<log_header *>(scratch);
+            if (header->magic != kDefaultMagic) {
+                LOG_WARNING("read log file '{}' header with bad magic {}, skip 
the left!",
+                            log_path,
+                            header->magic);
+                break;
+            }
+            _offset += kLogHeaderSize;
+
+            // Read body.
+            std::shared_ptr<char> 
buffer(dsn::utils::make_shared_array<char>(header->size));
+            s = log_file->PositionedRead(_offset, header->size, &result, 
buffer.get());
+            CHECK(s.ok(),
+                  "read log file '{}' header with bad body, err = {}",
+                  log_path,
+                  s.ToString());
+            _offset += header->size;
+
+            binary_reader reader(blob(buffer, header->size));
+            int op_type = 0;
+            CHECK_EQ(sizeof(op_type), reader.read(op_type));
+
+            switch (static_cast<operation_type>(op_type)) {
+            case operation_type::create_node: {
+                std::string node;
+                blob data;
+                create_node_log::parse(reader, node, data);
+                create_node_internal(node, data);
+                break;
+            }
+            case operation_type::delete_node: {
+                std::string node;
+                bool recursively_delete;
+                delete_node_log::parse(reader, node, recursively_delete);
+                delete_node_internal(node, recursively_delete);
+                break;
+            }
+            case operation_type::set_data: {
+                std::string node;
+                blob data;
+                set_data_log::parse(reader, node, data);
+                set_data_internal(node, data);
+                break;
+            }
+            default:
+                // The log is complete but its content is modified by cosmic 
ray. This is
+                // unacceptable
+                CHECK(false, "meta state server log corrupted");
             }
-            fclose(fd);
         }
     }
 
@@ -507,8 +529,9 @@ task_ptr meta_state_service_simple::get_children(const 
std::string &node,
 
 meta_state_service_simple::~meta_state_service_simple()
 {
-    _tracker.cancel_outstanding_tasks();
-    file::close(_log);
+    _tracker.wait_outstanding_tasks();
+    CHECK_EQ(ERR_OK, file::flush(_log));
+    CHECK_EQ(ERR_OK, file::close(_log));
 
     for (const auto &kv : _quick_map) {
         if ("/" != kv.first) {
diff --git a/src/meta/meta_state_service_simple.h 
b/src/meta/meta_state_service_simple.h
index f26e33e17..697b5b159 100644
--- a/src/meta/meta_state_service_simple.h
+++ b/src/meta/meta_state_service_simple.h
@@ -66,6 +66,7 @@ DEFINE_TASK_CODE_AIO(LPC_META_STATE_SERVICE_SIMPLE_INTERNAL,
                      TASK_PRIORITY_HIGH,
                      THREAD_POOL_DEFAULT);
 
+// Only for test purpose.
 class meta_state_service_simple : public meta_state_service
 {
 public:
diff --git a/src/meta/test/meta_state/meta_state_service.cpp 
b/src/meta/test/meta_state/meta_state_service.cpp
index a286c5067..4959670b2 100644
--- a/src/meta/test/meta_state/meta_state_service.cpp
+++ b/src/meta/test/meta_state/meta_state_service.cpp
@@ -27,6 +27,7 @@
 #include "meta/meta_state_service.h"
 
 #include <boost/lexical_cast.hpp>
+// IWYU pragma: no_include <gtest/gtest-param-test.h>
 // IWYU pragma: no_include <ext/alloc_traits.h>
 // IWYU pragma: no_include <gtest/gtest-message.h>
 // IWYU pragma: no_include <gtest/gtest-test-part.h>
@@ -36,12 +37,18 @@
 
 #include "meta/meta_state_service_simple.h"
 #include "meta/meta_state_service_zookeeper.h"
+#include "runtime/service_app.h"
 #include "runtime/task/task_tracker.h"
+#include "test_util/test_util.h"
 #include "utils/binary_reader.h"
 #include "utils/binary_writer.h"
+#include "utils/filesystem.h"
+#include "utils/flags.h"
 #include "utils/fmt_logging.h"
 #include "utils/threadpool_code.h"
 
+DSN_DECLARE_bool(encrypt_data_at_rest);
+
 using namespace dsn;
 using namespace dsn::dist;
 
@@ -50,8 +57,8 @@ DEFINE_TASK_CODE(META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
TASK_PRIORITY_HIGH, TH
 typedef std::function<meta_state_service *()> service_creator_func;
 typedef std::function<void(meta_state_service *)> service_deleter_func;
 
-#define expect_ok [](error_code ec) { EXPECT_TRUE(ec == ERR_OK); }
-#define expect_err [](error_code ec) { EXPECT_FALSE(ec == ERR_OK); }
+#define expect_ok [](error_code ec) { CHECK_EQ(ERR_OK, ec); }
+#define expect_err [](error_code ec) { CHECK_NE(ERR_OK, ec); }
 
 void provider_basic_test(const service_creator_func &service_creator,
                          const service_deleter_func &service_deleter)
@@ -70,9 +77,9 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         service->get_children("/1",
                               META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
                               [](error_code ec, const std::vector<std::string> 
&children) {
-                                  CHECK(ec == ERR_OK && children.size() == 1 &&
-                                            *children.begin() == "1",
-                                        "unexpected child");
+                                  CHECK_EQ(ERR_OK, ec);
+                                  CHECK_EQ(1, children.size());
+                                  CHECK_EQ("1", *children.begin());
                               });
         service->node_exist("/1/1", META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, 
expect_ok)->wait();
         service->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
@@ -107,11 +114,11 @@ void provider_basic_test(const service_creator_func 
&service_creator,
             ->get_data("/1",
                        META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
                        [](error_code ec, const dsn::blob &value) {
-                           expect_ok(ec);
+                           CHECK_EQ(ERR_OK, ec);
                            dsn::binary_reader reader(value);
                            int read_value = 0;
                            reader.read(read_value);
-                           CHECK_EQ(read_value, 0xdeadbeef);
+                           CHECK_EQ(0xdeadbeef, read_value);
                        })
             ->wait();
         writer = dsn::binary_writer();
@@ -124,27 +131,26 @@ void provider_basic_test(const service_creator_func 
&service_creator,
             ->get_data("/1",
                        META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
                        [](error_code ec, const dsn::blob &value) {
-                           expect_ok(ec);
+                           CHECK_EQ(ERR_OK, ec);
                            dsn::binary_reader reader(value);
                            int read_value = 0;
                            reader.read(read_value);
-                           CHECK_EQ(read_value, 0xbeefdead);
+                           CHECK_EQ(0xbeefdead, read_value);
                        })
             ->wait();
     }
-    // clean the node created in previos code-block, to support test in next 
round
+    // clean the node created in previous code-block, to support test in next 
round
     {
         service->delete_node("/1", false, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
             ->wait();
     }
 
-    typedef dsn::dist::meta_state_service::transaction_entries TEntries;
     // transaction op
     {
         // basic
         dsn::binary_writer writer;
         writer.write(0xdeadbeef);
-        std::shared_ptr<TEntries> entries = 
service->new_transaction_entries(5);
+        auto entries = service->new_transaction_entries(5);
         entries->create_node("/2");
         entries->create_node("/2/2");
         entries->create_node("/2/3");
@@ -155,11 +161,11 @@ void provider_basic_test(const service_creator_func 
&service_creator,
             entries, META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok);
         tsk->wait();
         for (unsigned int i = 0; i < 5; ++i) {
-            EXPECT_TRUE(entries->get_result(i) == ERR_OK);
+            ASSERT_EQ(ERR_OK, entries->get_result(i));
         }
 
         // an invalid operation will stop whole transaction
-        entries = service->new_transaction_entries(5);
+        entries = service->new_transaction_entries(4);
         entries->create_node("/3");
         entries->create_node("/4");
         entries->delete_node("/2"); // delete a non empty dir
@@ -168,11 +174,12 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
             ->wait();
         error_code err[4] = {ERR_OK, ERR_OK, ERR_INVALID_PARAMETERS, 
ERR_INCONSISTENT_STATE};
-        for (unsigned int i = 0; i < 4; ++i)
-            EXPECT_EQ(err[i], entries->get_result(i));
+        for (unsigned int i = 0; i < 4; ++i) {
+            ASSERT_EQ(err[i], entries->get_result(i));
+        }
 
         // another invalid transaction
-        entries = service->new_transaction_entries(5);
+        entries = service->new_transaction_entries(4);
         entries->create_node("/3");
         entries->create_node("/4");
         entries->delete_node("/5"); // delete a non exist dir
@@ -182,8 +189,9 @@ void provider_basic_test(const service_creator_func 
&service_creator,
         err[2] = ERR_OBJECT_NOT_FOUND;
         service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_err)
             ->wait();
-        for (unsigned int i = 0; i < 4; ++i)
-            EXPECT_EQ(err[i], entries->get_result(i));
+        for (unsigned int i = 0; i < 4; ++i) {
+            ASSERT_EQ(err[i], entries->get_result(i));
+        }
     }
 
     // check replay with transaction
@@ -195,7 +203,9 @@ void provider_basic_test(const service_creator_func 
&service_creator,
             ->get_children("/2",
                            META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
                            [](error_code ec, const std::vector<std::string> 
&children) {
-                               ASSERT_TRUE(children.size() == 1 && children[0] 
== "2");
+                               CHECK_EQ(ERR_OK, ec);
+                               CHECK_EQ(1, children.size());
+                               CHECK_EQ("2", children[0]);
                            })
             ->wait();
 
@@ -203,27 +213,27 @@ void provider_basic_test(const service_creator_func 
&service_creator,
             ->get_data("/2",
                        META_STATE_SERVICE_SIMPLE_TEST_CALLBACK,
                        [](error_code ec, const blob &value) {
-                           ASSERT_TRUE(ec == ERR_OK);
+                           CHECK_EQ(ERR_OK, ec);
                            binary_reader reader(value);
                            int content_value;
                            reader.read(content_value);
-                           ASSERT_TRUE(content_value == 0xdeadbeef);
+                           CHECK_EQ(0xdeadbeef, content_value);
                        })
             ->wait();
     }
 
     // delete the nodes created just now, using transaction delete
     {
-        std::shared_ptr<TEntries> entries = 
service->new_transaction_entries(2);
+        auto entries = service->new_transaction_entries(2);
         entries->delete_node("/2/2");
         entries->delete_node("/2");
 
         service->submit_transaction(entries, 
META_STATE_SERVICE_SIMPLE_TEST_CALLBACK, expect_ok)
             ->wait();
-        error_code err[2] = {ERR_OK, ERR_OK};
 
-        for (unsigned int i = 0; i < 2; ++i)
-            EXPECT_EQ(err[i], entries->get_result(i));
+        for (unsigned int i = 0; i < 2; ++i) {
+            ASSERT_EQ(ERR_OK, entries->get_result(i));
+        }
     }
 
     service_deleter(service);
@@ -235,7 +245,7 @@ void recursively_create_node_callback(meta_state_service 
*service,
                                       int current_layer,
                                       error_code ec)
 {
-    ASSERT_TRUE(ec == ERR_OK);
+    ASSERT_EQ(ERR_OK, ec);
     if (current_layer <= 0)
         return;
 
@@ -279,31 +289,41 @@ void provider_recursively_create_delete_test(const 
service_creator_func &creator
     deleter(service);
 }
 
-#undef expect_ok
-#undef expect_err
+class meta_state_service_test : public pegasus::encrypt_data_test_base
+{
+};
+
+// TODO(yingchun): ENCRYPTION: add enable encryption test.
+INSTANTIATE_TEST_CASE_P(, meta_state_service_test, ::testing::Values(false));
 
-TEST(meta_state_service, simple)
+TEST_P(meta_state_service_test, simple)
 {
     auto simple_service_creator = [] {
         meta_state_service_simple *svc = new meta_state_service_simple();
-        svc->initialize({});
+        auto err = svc->initialize({});
+        CHECK_EQ(ERR_OK, err);
         return svc;
     };
     auto simple_service_deleter = [](meta_state_service *simple_svc) { delete 
simple_svc; };
 
     provider_basic_test(simple_service_creator, simple_service_deleter);
     provider_recursively_create_delete_test(simple_service_creator, 
simple_service_deleter);
+
+    std::string log_path = dsn::utils::filesystem::path_combine(
+        service_app::current_service_app_info().data_dir, 
"meta_state_service.log");
+    ASSERT_TRUE(dsn::utils::filesystem::remove_path(log_path));
 }
 
-TEST(meta_state_service, zookeeper)
+TEST_P(meta_state_service_test, zookeeper)
 {
     auto zookeeper_service_creator = [] {
         meta_state_service_zookeeper *svc = new meta_state_service_zookeeper();
-        svc->initialize({});
+        auto err = svc->initialize({});
+        CHECK_EQ(ERR_OK, err);
         return svc;
     };
     auto zookeeper_service_deleter = [](meta_state_service *zookeeper_svc) {
-        ASSERT_EQ(zookeeper_svc->finalize(), ERR_OK);
+        ASSERT_EQ(ERR_OK, zookeeper_svc->finalize());
     };
 
     provider_basic_test(zookeeper_service_creator, zookeeper_service_deleter);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to