This is an automated email from the ASF dual-hosted git repository. jiashuo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit e0338f79e4b754614457fd199ecb1100171aa361 Author: Jiashuo <[email protected]> AuthorDate: Wed Mar 30 14:58:54 2022 +0800 feat(dup_enhancement#17): replica follower load duplication data when open replica (#917) --- .github/workflows/ci-pull-request.yaml | 1 + rdsn | 2 +- src/server/pegasus_server_impl.cpp | 156 ++++++++++++++------------- src/server/test/CMakeLists.txt | 1 + src/server/test/pegasus_server_impl_test.cpp | 24 +++++ src/server/test/pegasus_server_test_base.h | 18 +++- src/server/test/rocksdb_wrapper_test.cpp | 6 +- src/shell/commands/duplication.cpp | 6 +- 8 files changed, 130 insertions(+), 84 deletions(-) diff --git a/.github/workflows/ci-pull-request.yaml b/.github/workflows/ci-pull-request.yaml index 544640a..2d43c68 100644 --- a/.github/workflows/ci-pull-request.yaml +++ b/.github/workflows/ci-pull-request.yaml @@ -14,6 +14,7 @@ on: - master - 'v[0-9]+.*' # release branch - ci-test # testing branch for github action + - '*dev' # for manually triggering workflow workflow_dispatch: diff --git a/rdsn b/rdsn index 8c310b9..ea92106 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 8c310b95f8cf0a2123da5eed7e6621b6f94f30e0 +Subproject commit ea9210614ac5cac42cd3f8b8e1aedd68981b37de diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 4221aea..019854b 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -32,6 +32,7 @@ #include <dsn/dist/replication/replication.codes.h> #include <dsn/utility/flags.h> #include <dsn/utils/token_bucket_throttling_controller.h> +#include <dsn/dist/replication/duplication_common.h> #include "base/pegasus_key_schema.h" #include "base/pegasus_value_schema.h" @@ -1454,7 +1455,7 @@ void pegasus_server_impl::on_scan(scan_rpc rpc) void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache.fetch(args); } -::dsn::error_code pegasus_server_impl::start(int argc, char **argv) +dsn::error_code pegasus_server_impl::start(int argc, char **argv) { dassert_replica(!_is_open, "replica is already opened."); ddebug_replica("start to open app {}", data_dir()); @@ -1465,11 +1466,11 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache if (argc > 0) { if ((argc - 1) % 2 != 0) { derror_replica("parse envs failed, invalid argc = {}", argc); - return ::dsn::ERR_INVALID_PARAMETERS; + return dsn::ERR_INVALID_PARAMETERS; } if (argv == nullptr) { derror_replica("parse envs failed, invalid argv = nullptr"); - return ::dsn::ERR_INVALID_PARAMETERS; + return dsn::ERR_INVALID_PARAMETERS; } int idx = 1; while (idx < argc) { @@ -1485,8 +1486,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache // // here, we must distinguish three cases, such as: // case 1: we open the db that already exist - // case 2: we open a new db - // case 3: we restore the db base on old data + // case 2: we load duplication data base checkpoint from master + // case 3: we open a new db + // case 4: we restore the db base on old data // // if we want to restore the db base on old data, only all of the restore preconditions are // satisfied @@ -1496,62 +1498,67 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache // 3, restore_dir is exist // bool db_exist = true; - auto path = ::dsn::utils::filesystem::path_combine(data_dir(), "rdb"); - if (::dsn::utils::filesystem::path_exists(path)) { + auto rdb_path = dsn::utils::filesystem::path_combine(data_dir(), "rdb"); + auto duplication_path = duplication_dir(); + if (dsn::utils::filesystem::path_exists(rdb_path)) { // only case 1 - ddebug("%s: rdb is already exist, path = %s", replica_name(), path.c_str()); + ddebug_replica("rdb is already exist, path = {}", rdb_path); } else { - std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs); - const std::string &restore_dir = restore_info.first; - bool force_restore = restore_info.second; - if (restore_dir.empty()) { - // case 2 - if (force_restore) { - derror("%s: try to restore, but we can't combine restore_dir from envs", - replica_name()); - return ::dsn::ERR_FILE_OPERATION_FAILED; - } else { - db_exist = false; - dinfo("%s: open a new db, path = %s", replica_name(), path.c_str()); + // case 2 + if (dsn::utils::filesystem::path_exists(duplication_path) && is_duplication_follower()) { + if (!dsn::utils::filesystem::rename_path(duplication_path, rdb_path)) { + derror_replica( + "load duplication data from {} to {} failed", duplication_path, rdb_path); + return dsn::ERR_FILE_OPERATION_FAILED; } } else { - // case 3 - ddebug("%s: try to restore from restore_dir = %s", replica_name(), restore_dir.c_str()); - if (::dsn::utils::filesystem::directory_exists(restore_dir)) { - // here, we just rename restore_dir to rdb, then continue the normal process - if (::dsn::utils::filesystem::rename_path(restore_dir.c_str(), path.c_str())) { - ddebug("%s: rename restore_dir(%s) to rdb(%s) succeed", - replica_name(), - restore_dir.c_str(), - path.c_str()); + std::pair<std::string, bool> restore_info = get_restore_dir_from_env(envs); + const std::string &restore_dir = restore_info.first; + bool force_restore = restore_info.second; + if (restore_dir.empty()) { + // case 3 + if (force_restore) { + derror_replica("try to restore, but we can't combine restore_dir from envs"); + return dsn::ERR_FILE_OPERATION_FAILED; } else { - derror("%s: rename restore_dir(%s) to rdb(%s) failed", - replica_name(), - restore_dir.c_str(), - path.c_str()); - return ::dsn::ERR_FILE_OPERATION_FAILED; + db_exist = false; + dinfo_replica("open a new db, path = {}", rdb_path); } } else { - if (force_restore) { - derror("%s: try to restore, but restore_dir isn't exist, restore_dir = %s", - replica_name(), - restore_dir.c_str()); - return ::dsn::ERR_FILE_OPERATION_FAILED; + // case 4 + ddebug_replica("try to restore from restore_dir = {}", restore_dir); + if (dsn::utils::filesystem::directory_exists(restore_dir)) { + // here, we just rename restore_dir to rdb, then continue the normal process + if (dsn::utils::filesystem::rename_path(restore_dir, rdb_path)) { + ddebug_replica( + "rename restore_dir({}) to rdb({}) succeed", restore_dir, rdb_path); + } else { + derror_replica( + "rename restore_dir({}) to rdb({}) failed", restore_dir, rdb_path); + return dsn::ERR_FILE_OPERATION_FAILED; + } } else { - db_exist = false; - dwarn( - "%s: try to restore and restore_dir(%s) isn't exist, but we don't force " - "it, the role of this replica must not primary, so we open a new db on the " - "path(%s)", - replica_name(), - restore_dir.c_str(), - path.c_str()); + if (force_restore) { + derror_replica( + "try to restore, but restore_dir isn't exist, restore_dir = {}", + restore_dir); + return dsn::ERR_FILE_OPERATION_FAILED; + } else { + db_exist = false; + dwarn_replica( + "try to restore and restore_dir({}) isn't exist, but we don't force " + "it, the role of this replica must not primary, so we open a new db on " + "the " + "path({})", + restore_dir, + rdb_path); + } } } } } - ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str()); + ddebug_replica("start to open rocksDB's rdb({})", rdb_path); // Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which // will be used elsewhere. @@ -1561,9 +1568,9 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache // When DB exists, meta CF and data CF must be present. bool missing_meta_cf = true; bool missing_data_cf = true; - if (check_column_families(path, &missing_meta_cf, &missing_data_cf) != ::dsn::ERR_OK) { + if (check_column_families(rdb_path, &missing_meta_cf, &missing_data_cf) != dsn::ERR_OK) { derror_replica("check column families failed"); - return ::dsn::ERR_LOCAL_APP_FAILURE; + return dsn::ERR_LOCAL_APP_FAILURE; } dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0"); dassert_replica(!missing_data_cf, "Missing data column family"); @@ -1573,7 +1580,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs; rocksdb::ColumnFamilyOptions loaded_data_cf_opts; // Set `ignore_unknown_options` true for forward compatibility. - auto status = rocksdb::LoadLatestOptions(path, + auto status = rocksdb::LoadLatestOptions(rdb_path, rocksdb::Env::Default(), &loaded_db_opt, &loaded_cf_descs, @@ -1584,7 +1591,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache if (status.code() != rocksdb::Status::kInvalidArgument || status.ToString().find("pegasus_data") == std::string::npos) { derror_replica("load latest option file failed: {}.", status.ToString()); - return ::dsn::ERR_LOCAL_APP_FAILURE; + return dsn::ERR_LOCAL_APP_FAILURE; } has_incompatible_db_options = true; dwarn_replica("The latest option file has incompatible db options: {}, use default " @@ -1614,17 +1621,20 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache std::vector<rocksdb::ColumnFamilyDescriptor> column_families( {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); - auto s = rocksdb::CheckOptionsCompatibility( - path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true); + auto s = rocksdb::CheckOptionsCompatibility(rdb_path, + rocksdb::Env::Default(), + _db_opts, + column_families, + /*ignore_unknown_options=*/true); if (!s.ok() && !s.IsNotFound() && !has_incompatible_db_options) { derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString()); - return ::dsn::ERR_LOCAL_APP_FAILURE; + return dsn::ERR_LOCAL_APP_FAILURE; } std::vector<rocksdb::ColumnFamilyHandle *> handles_opened; - auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db); + auto status = rocksdb::DB::Open(_db_opts, rdb_path, column_families, &handles_opened, &_db); if (!status.ok()) { derror_replica("rocksdb::DB::Open failed, error = {}", status.ToString()); - return ::dsn::ERR_LOCAL_APP_FAILURE; + return dsn::ERR_LOCAL_APP_FAILURE; } dcheck_eq_replica(2, handles_opened.size()); dcheck_eq_replica(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME); @@ -1644,7 +1654,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { derror_replica("open app failed, unsupported data version {}", _pegasus_data_version); release_db(); - return ::dsn::ERR_LOCAL_APP_FAILURE; + return dsn::ERR_LOCAL_APP_FAILURE; } // update last manual compact finish timestamp @@ -1674,7 +1684,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache last_durable_decree(), last_flushed); auto err = async_checkpoint(false); - if (err != ::dsn::ERR_OK) { + if (err != dsn::ERR_OK) { derror_replica("create checkpoint failed, error = {}", err.to_string()); release_db(); return err; @@ -1695,10 +1705,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache dinfo_replica("start the update replica-level rocksdb statistics timer task"); _update_replica_rdb_stat = - ::dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON, - &_tracker, - [this]() { this->update_replica_rocksdb_statistics(); }, - _update_rdb_stat_interval); + dsn::tasking::enqueue_timer(LPC_REPLICATION_LONG_COMMON, + &_tracker, + [this]() { this->update_replica_rocksdb_statistics(); }, + _update_rdb_stat_interval); // These counters are singletons on this server shared by all replicas, their metrics update // task should be scheduled once an interval on the server view. @@ -1707,7 +1717,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache // The timer task will always running even though there is no replicas dassert_f(kServerStatUpdateTimeSec.count() != 0, "kServerStatUpdateTimeSec shouldn't be zero"); - _update_server_rdb_stat = ::dsn::tasking::enqueue_timer( + _update_server_rdb_stat = dsn::tasking::enqueue_timer( LPC_REPLICATION_LONG_COMMON, nullptr, // TODO: the tracker is nullptr, we will fix it later []() { update_server_rocksdb_statistics(); }, @@ -1719,17 +1729,17 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache this, _read_hotkey_collector, _write_hotkey_collector, _read_size_throttling_controller); _server_write = dsn::make_unique<pegasus_server_write>(this, _verbose_log); - ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY, - &_tracker, - [this]() { _read_hotkey_collector->analyse_data(); }, - std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s)); + dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY, + &_tracker, + [this]() { _read_hotkey_collector->analyse_data(); }, + std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s)); - ::dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY, - &_tracker, - [this]() { _write_hotkey_collector->analyse_data(); }, - std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s)); + dsn::tasking::enqueue_timer(LPC_ANALYZE_HOTKEY, + &_tracker, + [this]() { _write_hotkey_collector->analyse_data(); }, + std::chrono::seconds(FLAGS_hotkey_analyse_time_interval_s)); - return ::dsn::ERR_OK; + return dsn::ERR_OK; } void pegasus_server_impl::cancel_background_work(bool wait) diff --git a/src/server/test/CMakeLists.txt b/src/server/test/CMakeLists.txt index ec68275..4bd8558 100644 --- a/src/server/test/CMakeLists.txt +++ b/src/server/test/CMakeLists.txt @@ -58,6 +58,7 @@ set(MY_PROJ_LIBS PocoJSON pegasus_base gtest + gmock ) add_definitions(-DPEGASUS_UNIT_TEST) add_definitions(-DENABLE_FAIL) diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp index 0f3fab7..16671e4 100644 --- a/src/server/test/pegasus_server_impl_test.cpp +++ b/src/server/test/pegasus_server_impl_test.cpp @@ -143,5 +143,29 @@ TEST_F(pegasus_server_impl_test, test_update_user_specified_compaction) _server->update_user_specified_compaction(envs); ASSERT_EQ(user_specified_compaction, _server->_user_specified_compaction); } + +TEST_F(pegasus_server_impl_test, test_load_from_duplication_data) +{ + auto origin_file = fmt::format("{}/{}", _server->duplication_dir(), "checkpoint"); + dsn::utils::filesystem::create_directory(_server->duplication_dir()); + dsn::utils::filesystem::create_file(origin_file); + ASSERT_TRUE(dsn::utils::filesystem::file_exists(origin_file)); + + EXPECT_CALL(*_server, is_duplication_follower()).WillRepeatedly(testing::Return(true)); + + auto tempFolder = "invalid"; + dsn::utils::filesystem::rename_path(_server->data_dir(), tempFolder); + ASSERT_EQ(start(), dsn::ERR_FILE_OPERATION_FAILED); + + dsn::utils::filesystem::rename_path(tempFolder, _server->data_dir()); + auto rdb_path = fmt::format("{}/rdb/", _server->data_dir()); + auto new_file = fmt::format("{}/{}", rdb_path, "checkpoint"); + ASSERT_EQ(start(), dsn::ERR_LOCAL_APP_FAILURE); + ASSERT_TRUE(dsn::utils::filesystem::directory_exists(rdb_path)); + ASSERT_FALSE(dsn::utils::filesystem::file_exists(origin_file)); + ASSERT_TRUE(dsn::utils::filesystem::file_exists(new_file)); + dsn::utils::filesystem::remove_file_name(new_file); +} + } // namespace server } // namespace pegasus diff --git a/src/server/test/pegasus_server_test_base.h b/src/server/test/pegasus_server_test_base.h index 54af092..d9ed1e3 100644 --- a/src/server/test/pegasus_server_test_base.h +++ b/src/server/test/pegasus_server_test_base.h @@ -22,12 +22,22 @@ #include "server/pegasus_server_impl.h" #include <gtest/gtest.h> +#include <gmock/gmock.h> #include <dsn/dist/replication/replica_test_utils.h> #include <dsn/utility/filesystem.h> namespace pegasus { namespace server { +class mock_pegasus_server_impl : public pegasus_server_impl +{ +public: + mock_pegasus_server_impl(dsn::replication::replica *r) : pegasus_server_impl(r) {} + +public: + MOCK_CONST_METHOD0(is_duplication_follower, bool()); +}; + class pegasus_server_test_base : public ::testing::Test { public: @@ -41,10 +51,10 @@ public: dsn::app_info app_info; app_info.app_type = "pegasus"; - _replica = - dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false); + _replica = dsn::replication::create_test_replica( + _replica_stub, _gpid, app_info, "./", false, false); - _server = dsn::make_unique<pegasus_server_impl>(_replica); + _server = dsn::make_unique<mock_pegasus_server_impl>(_replica); } dsn::error_code start(const std::map<std::string, std::string> &envs = {}) @@ -72,7 +82,7 @@ public: } protected: - std::unique_ptr<pegasus_server_impl> _server; + std::unique_ptr<mock_pegasus_server_impl> _server; dsn::replication::replica *_replica; dsn::replication::replica_stub *_replica_stub; dsn::gpid _gpid; diff --git a/src/server/test/rocksdb_wrapper_test.cpp b/src/server/test/rocksdb_wrapper_test.cpp index 313f522..7c21d0f 100644 --- a/src/server/test/rocksdb_wrapper_test.cpp +++ b/src/server/test/rocksdb_wrapper_test.cpp @@ -62,9 +62,9 @@ public: dsn::app_info app_info; app_info.app_type = "pegasus"; app_info.duplicating = true; - _replica = - dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false); - _server = dsn::make_unique<pegasus_server_impl>(_replica); + _replica = dsn::replication::create_test_replica( + _replica_stub, _gpid, app_info, "./", false, false); + _server = dsn::make_unique<mock_pegasus_server_impl>(_replica); SetUp(); } diff --git a/src/shell/commands/duplication.cpp b/src/shell/commands/duplication.cpp index 90b1781..3119be4 100644 --- a/src/shell/commands/duplication.cpp +++ b/src/shell/commands/duplication.cpp @@ -205,7 +205,7 @@ bool change_dup_status(command_executor *e, std::string operation; switch (status) { - case duplication_status::DS_START: + case duplication_status::DS_LOG: operation = "starting duplication"; break; case duplication_status::DS_PAUSE: @@ -215,7 +215,7 @@ bool change_dup_status(command_executor *e, operation = "removing duplication"; break; default: - dfatal("unexpected duplication status %d", status); + dfatal("can't change duplication under status %d", status); } auto err_resp = sc->ddl_client->change_dup_status(app_name, dup_id, status); @@ -231,7 +231,7 @@ bool remove_dup(command_executor *e, shell_context *sc, arguments args) bool start_dup(command_executor *e, shell_context *sc, arguments args) { - return change_dup_status(e, sc, args, duplication_status::DS_START); + return change_dup_status(e, sc, args, duplication_status::DS_LOG); } bool pause_dup(command_executor *e, shell_context *sc, arguments args) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
