This is an automated email from the ASF dual-hosted git repository. yuchenhe pushed a commit to branch v2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit 839826d288159767a4048eb9d961eb88cfd5d99c Author: Zhang Yifan <[email protected]> AuthorDate: Mon Aug 31 11:58:57 2020 +0800 fix: Load options from file when open an exist DB (#587) --- rdsn | 2 +- src/server/meta_store.cpp | 58 +++++++++++++++--- src/server/meta_store.h | 12 ++++ src/server/pegasus_server_impl.cpp | 90 ++++++++++++++++++++-------- src/server/pegasus_server_impl.h | 7 ++- src/server/test/pegasus_server_impl_test.cpp | 37 +++++++++++- 6 files changed, 169 insertions(+), 37 deletions(-) diff --git a/rdsn b/rdsn index 40d86f7..2158aeb 160000 --- a/rdsn +++ b/rdsn @@ -1 +1 @@ -Subproject commit 40d86f7f7aa51ba874d9cc91cb7e267f18b2bd11 +Subproject commit 2158aeb5022a9881c18bbad3addd02c68eb93323 diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp index e1562c4..fee4e51 100644 --- a/src/server/meta_store.cpp +++ b/src/server/meta_store.cpp @@ -58,6 +58,19 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db, return last_flushed_decree; } +std::string meta_store::get_usage_scenario() const +{ + // If couldn't find rocksdb usage scenario in meta column family, return normal in default. + std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL; + auto ec = get_string_value_from_meta_cf(false, ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario); + dassert_replica(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND, + "rocksdb {} get {} from meta column family failed: {}", + _db->GetName(), + ROCKSDB_ENV_USAGE_SCENARIO_KEY, + ec.to_string()); + return usage_scenario; +} + ::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const @@ -72,18 +85,37 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db, uint64_t *value) { std::string data; + auto ec = get_string_value_from_meta_cf(db, cf, read_flushed_data, key, &data); + if (ec != ::dsn::ERR_OK) { + return ec; + } + dassert_f(dsn::buf2uint64(data, *value), + "rocksdb {} get \"{}\" from meta column family failed to parse into uint64", + db->GetName(), + data); + return ::dsn::ERR_OK; +} + +::dsn::error_code meta_store::get_string_value_from_meta_cf(bool read_flushed_data, + const std::string &key, + std::string *value) const +{ + return get_string_value_from_meta_cf(_db, _meta_cf, read_flushed_data, key, value); +} + +::dsn::error_code meta_store::get_string_value_from_meta_cf(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *cf, + bool read_flushed_data, + const std::string &key, + std::string *value) +{ rocksdb::ReadOptions rd_opts; if (read_flushed_data) { // only read 'flushed' data, mainly to read 'last_flushed_decree' rd_opts.read_tier = rocksdb::kPersistedTier; } - auto status = db->Get(rd_opts, cf, key, &data); + auto status = db->Get(rd_opts, cf, key, value); if (status.ok()) { - dassert_f(dsn::buf2uint64(data, *value), - "rocksdb {} get {} from meta column family got error value {}", - db->GetName(), - key, - data); return ::dsn::ERR_OK; } @@ -97,7 +129,13 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db, ::dsn::error_code meta_store::set_value_to_meta_cf(const std::string &key, uint64_t value) const { - auto status = _db->Put(_wt_opts, _meta_cf, key, std::to_string(value)); + return set_string_value_to_meta_cf(key, std::to_string(value)); +} + +::dsn::error_code meta_store::set_string_value_to_meta_cf(const std::string &key, + const std::string &value) const +{ + auto status = _db->Put(_wt_opts, _meta_cf, key, value); if (!status.ok()) { derror_replica( "Put {}={} to meta column family failed, status {}", key, value, status.ToString()); @@ -124,5 +162,11 @@ void meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac set_value_to_meta_cf(LAST_MANUAL_COMPACT_FINISH_TIME, last_manual_compact_finish_time)); } +void meta_store::set_usage_scenario(const std::string &usage_scenario) const +{ + dcheck_eq_replica(::dsn::ERR_OK, + set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario)); +} + } // namespace server } // namespace pegasus diff --git a/src/server/meta_store.h b/src/server/meta_store.h index 322971e..6e7a7b8 100644 --- a/src/server/meta_store.h +++ b/src/server/meta_store.h @@ -30,21 +30,33 @@ public: rocksdb::ColumnFamilyHandle *meta_cf) const; uint32_t get_data_version() const; uint64_t get_last_manual_compact_finish_time() const; + std::string get_usage_scenario() const; void set_last_flushed_decree(uint64_t decree) const; void set_data_version(uint32_t version) const; void set_last_manual_compact_finish_time(uint64_t last_manual_compact_finish_time) const; + void set_usage_scenario(const std::string &usage_scenario) const; private: ::dsn::error_code get_value_from_meta_cf(bool read_flushed_data, const std::string &key, uint64_t *value) const; + ::dsn::error_code get_string_value_from_meta_cf(bool read_flushed_data, + const std::string &key, + std::string *value) const; ::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t value) const; + ::dsn::error_code set_string_value_to_meta_cf(const std::string &key, + const std::string &value) const; static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db, rocksdb::ColumnFamilyHandle *cf, bool read_flushed_data, const std::string &key, uint64_t *value); + static ::dsn::error_code get_string_value_from_meta_cf(rocksdb::DB *db, + rocksdb::ColumnFamilyHandle *cf, + bool read_flushed_data, + const std::string &key, + std::string *value); friend class pegasus_write_service; diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index b181c44..b61bf31 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -8,6 +8,7 @@ #include <boost/lexical_cast.hpp> #include <rocksdb/convenience.h> #include <rocksdb/utilities/checkpoint.h> +#include <rocksdb/utilities/options_util.h> #include <dsn/utility/chrono_literals.h> #include <dsn/utility/utils.h> #include <dsn/utility/filesystem.h> @@ -1326,14 +1327,45 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), path.c_str()); + // Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which + // will be used elsewhere. + rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts; if (db_exist) { - // When DB exist, meta CF must be present. bool missing_meta_cf = true; - if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) { - derror_replica("check meta column family failed"); + bool missing_data_cf = true; + // Load latest options from option file stored in the db directory. + rocksdb::DBOptions loaded_db_opt; + std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs; + rocksdb::ColumnFamilyOptions loaded_data_cf_opts; + // Set `ignore_unknown_options` true for forward compatibility. + auto status = rocksdb::LoadLatestOptions(path, + rocksdb::Env::Default(), + &loaded_db_opt, + &loaded_cf_descs, + /*ignore_unknown_options=*/true); + if (!status.ok()) { + derror_replica("load latest option file failed."); return ::dsn::ERR_LOCAL_APP_FAILURE; } + for (int i = 0; i < loaded_cf_descs.size(); ++i) { + if (loaded_cf_descs[i].name == META_COLUMN_FAMILY_NAME) { + missing_meta_cf = false; + } else if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) { + missing_data_cf = false; + loaded_data_cf_opts = loaded_cf_descs[i].options; + } else { + derror_replica("unknown column family name."); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } + } + // When DB exists, meta CF and data CF must be present. dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server from 2.0"); + dassert_replica(!missing_data_cf, "Missing data column family"); + // Reset usage scenario related options according to loaded_data_cf_opts. + // We don't use `loaded_data_cf_opts` directly because pointer-typed options will only be + // initialized with default values when calling 'LoadLatestOptions', see + // 'rocksdb/utilities/options_util.h'. + reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts); } else { // When create new DB, we have to create a new column family to store meta data (meta column // family). @@ -1341,7 +1373,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache } std::vector<rocksdb::ColumnFamilyDescriptor> column_families( - {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); + {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); + auto s = rocksdb::CheckOptionsCompatibility( + path, rocksdb::Env::Default(), _db_opts, column_families, /*ignore_unknown_options=*/true); + if (!s.ok() && !s.IsNotFound()) { + derror_replica("rocksdb::CheckOptionsCompatibility failed, error = {}", s.ToString()); + return ::dsn::ERR_LOCAL_APP_FAILURE; + } std::vector<rocksdb::ColumnFamilyHandle *> handles_opened; auto status = rocksdb::DB::Open(_db_opts, path, column_families, &handles_opened, &_db); if (!status.ok()) { @@ -1360,6 +1398,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache if (db_exist) { _last_committed_decree = _meta_store->get_last_flushed_decree(); _pegasus_data_version = _meta_store->get_data_version(); + _usage_scenario = _meta_store->get_usage_scenario(); uint64_t last_manual_compact_finish_time = _meta_store->get_last_manual_compact_finish_time(); if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) { @@ -1407,8 +1446,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t &args) { _context_cache _is_open = true; - // set default usage scenario after db opened. - set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL); + if (!db_exist) { + // When create a new db, update usage scenario according to app envs. + update_usage_scenario(envs); + } dinfo_replica("start the update rocksdb statistics timer task"); _update_replica_rdb_stat = @@ -2472,6 +2513,7 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario) return false; } if (set_options(new_options)) { + _meta_store->set_usage_scenario(usage_scenario); _usage_scenario = usage_scenario; ddebug_replica( "set usage scenario from \"{}\" to \"{}\" succeed", old_usage_scenario, usage_scenario); @@ -2483,6 +2525,23 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario) } } +void pegasus_server_impl::reset_usage_scenario_options( + const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts) +{ + // reset usage scenario related options, refer to options set in 'set_usage_scenario' function. + target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger; + target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger; + target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger; + target_opts->soft_pending_compaction_bytes_limit = + base_opts.soft_pending_compaction_bytes_limit; + target_opts->hard_pending_compaction_bytes_limit = + base_opts.hard_pending_compaction_bytes_limit; + target_opts->disable_auto_compactions = base_opts.disable_auto_compactions; + target_opts->max_compaction_bytes = base_opts.max_compaction_bytes; + target_opts->write_buffer_size = base_opts.write_buffer_size; + target_opts->max_write_buffer_number = base_opts.max_write_buffer_number; +} + bool pegasus_server_impl::set_options( const std::unordered_map<std::string, std::string> &new_options) { @@ -2607,25 +2666,6 @@ void pegasus_server_impl::set_partition_version(int32_t partition_version) // TODO(heyuchen): set filter _partition_version in further pr } -::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, bool *missing_meta_cf) -{ - *missing_meta_cf = true; - std::vector<std::string> column_families; - auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, &column_families); - if (!s.ok()) { - derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", s.ToString()); - return ::dsn::ERR_LOCAL_APP_FAILURE; - } - - for (const auto &column_family : column_families) { - if (column_family == META_COLUMN_FAMILY_NAME) { - *missing_meta_cf = false; - break; - } - } - return ::dsn::ERR_OK; -} - ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait) { rocksdb::FlushOptions options; diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index 2ce6015..7165f76 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -166,6 +166,8 @@ private: friend class pegasus_compression_options_test; friend class pegasus_server_impl_test; FRIEND_TEST(pegasus_server_impl_test, default_data_version); + FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options); + FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs); friend class pegasus_manual_compact_service; friend class pegasus_write_service; @@ -260,6 +262,9 @@ private: // return true if successfully changed bool set_usage_scenario(const std::string &usage_scenario); + void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts, + rocksdb::ColumnFamilyOptions *target_opts); + // return true if successfully set bool set_options(const std::unordered_map<std::string, std::string> &new_options); @@ -305,8 +310,6 @@ private: return false; } - ::dsn::error_code check_meta_cf(const std::string &path, bool *missing_meta_cf); - void release_db(); void release_db(rocksdb::DB *db, const std::vector<rocksdb::ColumnFamilyHandle *> &handles); diff --git a/src/server/test/pegasus_server_impl_test.cpp b/src/server/test/pegasus_server_impl_test.cpp index c8c16b4..d9d161a 100644 --- a/src/server/test/pegasus_server_impl_test.cpp +++ b/src/server/test/pegasus_server_impl_test.cpp @@ -11,7 +11,7 @@ namespace server { class pegasus_server_impl_test : public pegasus_server_test_base { public: - pegasus_server_impl_test() : pegasus_server_test_base() { start(); } + pegasus_server_impl_test() : pegasus_server_test_base() {} void test_table_level_slow_query() { @@ -59,12 +59,45 @@ public: } }; -TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { test_table_level_slow_query(); } +TEST_F(pegasus_server_impl_test, test_table_level_slow_query) +{ + start(); + test_table_level_slow_query(); +} TEST_F(pegasus_server_impl_test, default_data_version) { + start(); ASSERT_EQ(_server->_pegasus_data_version, 1); } +TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options) +{ + // open a new db with no app env. + start(); + ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario); + // set bulk_load scenario for the db. + ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD)); + ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); + rocksdb::Options opts = _server->_db->GetOptions(); + ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger); + ASSERT_EQ(true, opts.disable_auto_compactions); + // reopen the db. + _server->stop(false); + start(); + ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); + ASSERT_EQ(opts.level0_file_num_compaction_trigger, + _server->_db->GetOptions().level0_file_num_compaction_trigger); + ASSERT_EQ(opts.disable_auto_compactions, _server->_db->GetOptions().disable_auto_compactions); +} + +TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs) +{ + std::map<std::string, std::string> envs; + envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD; + start(envs); + ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario); +} + } // namespace server } // namespace pegasus --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
