This is an automated email from the ASF dual-hosted git repository.
wutao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 65bbee2 fix: Load options from file when open an exist DB (#587)
65bbee2 is described below
commit 65bbee2190da2e2051d5bdc66c6448de95690c31
Author: Zhang Yifan <[email protected]>
AuthorDate: Mon Aug 31 11:58:57 2020 +0800
fix: Load options from file when open an exist DB (#587)
---
rdsn | 2 +-
src/server/meta_store.cpp | 58 +++++++++++++++---
src/server/meta_store.h | 12 ++++
src/server/pegasus_server_impl.cpp | 90 ++++++++++++++++++++--------
src/server/pegasus_server_impl.h | 7 ++-
src/server/test/pegasus_server_impl_test.cpp | 37 +++++++++++-
6 files changed, 169 insertions(+), 37 deletions(-)
diff --git a/rdsn b/rdsn
index abd93ac..f6f0eb4 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit abd93ac1b7b3c13e620e9018415ed3c8b1de6e53
+Subproject commit f6f0eb4b67bfdd03f1c0ac707f6bd6c459e4340c
diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp
index e1562c4..fee4e51 100644
--- a/src/server/meta_store.cpp
+++ b/src/server/meta_store.cpp
@@ -58,6 +58,19 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB
*db,
return last_flushed_decree;
}
+std::string meta_store::get_usage_scenario() const
+{
+ // If couldn't find rocksdb usage scenario in meta column family, return
normal in default.
+ std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+ auto ec = get_string_value_from_meta_cf(false,
ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario);
+ dassert_replica(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
+ "rocksdb {} get {} from meta column family failed: {}",
+ _db->GetName(),
+ ROCKSDB_ENV_USAGE_SCENARIO_KEY,
+ ec.to_string());
+ return usage_scenario;
+}
+
::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data,
const std::string &key,
uint64_t *value) const
@@ -72,18 +85,37 @@ uint64_t
meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
uint64_t *value)
{
std::string data;
+ auto ec = get_string_value_from_meta_cf(db, cf, read_flushed_data, key,
&data);
+ if (ec != ::dsn::ERR_OK) {
+ return ec;
+ }
+ dassert_f(dsn::buf2uint64(data, *value),
+ "rocksdb {} get \"{}\" from meta column family failed to parse
into uint64",
+ db->GetName(),
+ data);
+ return ::dsn::ERR_OK;
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(bool
read_flushed_data,
+ const std::string
&key,
+ std::string
*value) const
+{
+ return get_string_value_from_meta_cf(_db, _meta_cf, read_flushed_data,
key, value);
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(rocksdb::DB *db,
+
rocksdb::ColumnFamilyHandle *cf,
+ bool
read_flushed_data,
+ const std::string
&key,
+ std::string *value)
+{
rocksdb::ReadOptions rd_opts;
if (read_flushed_data) {
// only read 'flushed' data, mainly to read 'last_flushed_decree'
rd_opts.read_tier = rocksdb::kPersistedTier;
}
- auto status = db->Get(rd_opts, cf, key, &data);
+ auto status = db->Get(rd_opts, cf, key, value);
if (status.ok()) {
- dassert_f(dsn::buf2uint64(data, *value),
- "rocksdb {} get {} from meta column family got error value
{}",
- db->GetName(),
- key,
- data);
return ::dsn::ERR_OK;
}
@@ -97,7 +129,13 @@ uint64_t
meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
::dsn::error_code meta_store::set_value_to_meta_cf(const std::string &key,
uint64_t value) const
{
- auto status = _db->Put(_wt_opts, _meta_cf, key, std::to_string(value));
+ return set_string_value_to_meta_cf(key, std::to_string(value));
+}
+
+::dsn::error_code meta_store::set_string_value_to_meta_cf(const std::string
&key,
+ const std::string
&value) const
+{
+ auto status = _db->Put(_wt_opts, _meta_cf, key, value);
if (!status.ok()) {
derror_replica(
"Put {}={} to meta column family failed, status {}", key, value,
status.ToString());
@@ -124,5 +162,11 @@ void
meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac
set_value_to_meta_cf(LAST_MANUAL_COMPACT_FINISH_TIME,
last_manual_compact_finish_time));
}
+void meta_store::set_usage_scenario(const std::string &usage_scenario) const
+{
+ dcheck_eq_replica(::dsn::ERR_OK,
+
set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario));
+}
+
} // namespace server
} // namespace pegasus
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index 322971e..6e7a7b8 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -30,21 +30,33 @@ public:
rocksdb::ColumnFamilyHandle *meta_cf)
const;
uint32_t get_data_version() const;
uint64_t get_last_manual_compact_finish_time() const;
+ std::string get_usage_scenario() const;
void set_last_flushed_decree(uint64_t decree) const;
void set_data_version(uint32_t version) const;
void set_last_manual_compact_finish_time(uint64_t
last_manual_compact_finish_time) const;
+ void set_usage_scenario(const std::string &usage_scenario) const;
private:
::dsn::error_code
get_value_from_meta_cf(bool read_flushed_data, const std::string &key,
uint64_t *value) const;
+ ::dsn::error_code get_string_value_from_meta_cf(bool read_flushed_data,
+ const std::string &key,
+ std::string *value) const;
::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t
value) const;
+ ::dsn::error_code set_string_value_to_meta_cf(const std::string &key,
+ const std::string &value)
const;
static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cf,
bool read_flushed_data,
const std::string &key,
uint64_t *value);
+ static ::dsn::error_code get_string_value_from_meta_cf(rocksdb::DB *db,
+
rocksdb::ColumnFamilyHandle *cf,
+ bool
read_flushed_data,
+ const std::string
&key,
+ std::string *value);
friend class pegasus_write_service;
diff --git a/src/server/pegasus_server_impl.cpp
b/src/server/pegasus_server_impl.cpp
index b181c44..b61bf31 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -8,6 +8,7 @@
#include <boost/lexical_cast.hpp>
#include <rocksdb/convenience.h>
#include <rocksdb/utilities/checkpoint.h>
+#include <rocksdb/utilities/options_util.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/filesystem.h>
@@ -1326,14 +1327,45 @@ void pegasus_server_impl::on_clear_scanner(const
int64_t &args) { _context_cache
ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(),
path.c_str());
+ // Here we create a `tmp_data_cf_opts` because we don't want to modify
`_data_cf_opts`, which
+ // will be used elsewhere.
+ rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
if (db_exist) {
- // When DB exist, meta CF must be present.
bool missing_meta_cf = true;
- if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) {
- derror_replica("check meta column family failed");
+ bool missing_data_cf = true;
+ // Load latest options from option file stored in the db directory.
+ rocksdb::DBOptions loaded_db_opt;
+ std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
+ rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
+ // Set `ignore_unknown_options` true for forward compatibility.
+ auto status = rocksdb::LoadLatestOptions(path,
+ rocksdb::Env::Default(),
+ &loaded_db_opt,
+ &loaded_cf_descs,
+
/*ignore_unknown_options=*/true);
+ if (!status.ok()) {
+ derror_replica("load latest option file failed.");
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
+ for (int i = 0; i < loaded_cf_descs.size(); ++i) {
+ if (loaded_cf_descs[i].name == META_COLUMN_FAMILY_NAME) {
+ missing_meta_cf = false;
+ } else if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
+ missing_data_cf = false;
+ loaded_data_cf_opts = loaded_cf_descs[i].options;
+ } else {
+ derror_replica("unknown column family name.");
+ return ::dsn::ERR_LOCAL_APP_FAILURE;
+ }
+ }
+ // When DB exists, meta CF and data CF must be present.
dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server
from 2.0");
+ dassert_replica(!missing_data_cf, "Missing data column family");
+ // Reset usage scenario related options according to
loaded_data_cf_opts.
+ // We don't use `loaded_data_cf_opts` directly because pointer-typed
options will only be
+ // initialized with default values when calling 'LoadLatestOptions',
see
+ // 'rocksdb/utilities/options_util.h'.
+ reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
} else {
// When create new DB, we have to create a new column family to store
meta data (meta column
// family).
@@ -1341,7 +1373,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t
&args) { _context_cache
}
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
- {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, {META_COLUMN_FAMILY_NAME,
_meta_cf_opts}});
+ {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts},
{META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
+ auto s = rocksdb::CheckOptionsCompatibility(
+ path, rocksdb::Env::Default(), _db_opts, column_families,
/*ignore_unknown_options=*/true);
+ if (!s.ok() && !s.IsNotFound()) {
+ derror_replica("rocksdb::CheckOptionsCompatibility failed, error =
{}", s.ToString());
+ return ::dsn::ERR_LOCAL_APP_FAILURE;
+ }
std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
auto status = rocksdb::DB::Open(_db_opts, path, column_families,
&handles_opened, &_db);
if (!status.ok()) {
@@ -1360,6 +1398,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t
&args) { _context_cache
if (db_exist) {
_last_committed_decree = _meta_store->get_last_flushed_decree();
_pegasus_data_version = _meta_store->get_data_version();
+ _usage_scenario = _meta_store->get_usage_scenario();
uint64_t last_manual_compact_finish_time =
_meta_store->get_last_manual_compact_finish_time();
if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
@@ -1407,8 +1446,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t
&args) { _context_cache
_is_open = true;
- // set default usage scenario after db opened.
- set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
+ if (!db_exist) {
+ // When create a new db, update usage scenario according to app envs.
+ update_usage_scenario(envs);
+ }
dinfo_replica("start the update rocksdb statistics timer task");
_update_replica_rdb_stat =
@@ -2472,6 +2513,7 @@ bool pegasus_server_impl::set_usage_scenario(const
std::string &usage_scenario)
return false;
}
if (set_options(new_options)) {
+ _meta_store->set_usage_scenario(usage_scenario);
_usage_scenario = usage_scenario;
ddebug_replica(
"set usage scenario from \"{}\" to \"{}\" succeed",
old_usage_scenario, usage_scenario);
@@ -2483,6 +2525,23 @@ bool pegasus_server_impl::set_usage_scenario(const
std::string &usage_scenario)
}
}
+void pegasus_server_impl::reset_usage_scenario_options(
+ const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts)
+{
+ // reset usage scenario related options, refer to options set in
'set_usage_scenario' function.
+ target_opts->level0_file_num_compaction_trigger =
base_opts.level0_file_num_compaction_trigger;
+ target_opts->level0_slowdown_writes_trigger =
base_opts.level0_slowdown_writes_trigger;
+ target_opts->level0_stop_writes_trigger =
base_opts.level0_stop_writes_trigger;
+ target_opts->soft_pending_compaction_bytes_limit =
+ base_opts.soft_pending_compaction_bytes_limit;
+ target_opts->hard_pending_compaction_bytes_limit =
+ base_opts.hard_pending_compaction_bytes_limit;
+ target_opts->disable_auto_compactions = base_opts.disable_auto_compactions;
+ target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
+ target_opts->write_buffer_size = base_opts.write_buffer_size;
+ target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
+}
+
bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
@@ -2607,25 +2666,6 @@ void pegasus_server_impl::set_partition_version(int32_t
partition_version)
// TODO(heyuchen): set filter _partition_version in further pr
}
-::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path,
bool *missing_meta_cf)
-{
- *missing_meta_cf = true;
- std::vector<std::string> column_families;
- auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path,
&column_families);
- if (!s.ok()) {
- derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}",
s.ToString());
- return ::dsn::ERR_LOCAL_APP_FAILURE;
- }
-
- for (const auto &column_family : column_families) {
- if (column_family == META_COLUMN_FAMILY_NAME) {
- *missing_meta_cf = false;
- break;
- }
- }
- return ::dsn::ERR_OK;
-}
-
::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
{
rocksdb::FlushOptions options;
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 2ce6015..7165f76 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -166,6 +166,8 @@ private:
friend class pegasus_compression_options_test;
friend class pegasus_server_impl_test;
FRIEND_TEST(pegasus_server_impl_test, default_data_version);
+ FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
+ FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
friend class pegasus_manual_compact_service;
friend class pegasus_write_service;
@@ -260,6 +262,9 @@ private:
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);
+ void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions
&base_opts,
+ rocksdb::ColumnFamilyOptions
*target_opts);
+
// return true if successfully set
bool set_options(const std::unordered_map<std::string, std::string>
&new_options);
@@ -305,8 +310,6 @@ private:
return false;
}
- ::dsn::error_code check_meta_cf(const std::string &path, bool
*missing_meta_cf);
-
void release_db();
void release_db(rocksdb::DB *db, const
std::vector<rocksdb::ColumnFamilyHandle *> &handles);
diff --git a/src/server/test/pegasus_server_impl_test.cpp
b/src/server/test/pegasus_server_impl_test.cpp
index c8c16b4..d9d161a 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -11,7 +11,7 @@ namespace server {
class pegasus_server_impl_test : public pegasus_server_test_base
{
public:
- pegasus_server_impl_test() : pegasus_server_test_base() { start(); }
+ pegasus_server_impl_test() : pegasus_server_test_base() {}
void test_table_level_slow_query()
{
@@ -59,12 +59,45 @@ public:
}
};
-TEST_F(pegasus_server_impl_test, test_table_level_slow_query) {
test_table_level_slow_query(); }
+TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
+{
+ start();
+ test_table_level_slow_query();
+}
TEST_F(pegasus_server_impl_test, default_data_version)
{
+ start();
ASSERT_EQ(_server->_pegasus_data_version, 1);
}
+TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
+{
+ // open a new db with no app env.
+ start();
+ ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
+ // set bulk_load scenario for the db.
+
ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
+ ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+ rocksdb::Options opts = _server->_db->GetOptions();
+ ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
+ ASSERT_EQ(true, opts.disable_auto_compactions);
+ // reopen the db.
+ _server->stop(false);
+ start();
+ ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+ ASSERT_EQ(opts.level0_file_num_compaction_trigger,
+ _server->_db->GetOptions().level0_file_num_compaction_trigger);
+ ASSERT_EQ(opts.disable_auto_compactions,
_server->_db->GetOptions().disable_auto_compactions);
+}
+
+TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
+{
+ std::map<std::string, std::string> envs;
+ envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] =
ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
+ start(envs);
+ ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+}
+
} // namespace server
} // namespace pegasus
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]