This is an automated email from the ASF dual-hosted git repository.

yuchenhe pushed a commit to branch v2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git

commit 839826d288159767a4048eb9d961eb88cfd5d99c
Author: Zhang Yifan <[email protected]>
AuthorDate: Mon Aug 31 11:58:57 2020 +0800

    fix: Load options from file when open an exist DB (#587)
---
 rdsn                                         |  2 +-
 src/server/meta_store.cpp                    | 58 +++++++++++++++---
 src/server/meta_store.h                      | 12 ++++
 src/server/pegasus_server_impl.cpp           | 90 ++++++++++++++++++++--------
 src/server/pegasus_server_impl.h             |  7 ++-
 src/server/test/pegasus_server_impl_test.cpp | 37 +++++++++++-
 6 files changed, 169 insertions(+), 37 deletions(-)

diff --git a/rdsn b/rdsn
index 40d86f7..2158aeb 160000
--- a/rdsn
+++ b/rdsn
@@ -1 +1 @@
-Subproject commit 40d86f7f7aa51ba874d9cc91cb7e267f18b2bd11
+Subproject commit 2158aeb5022a9881c18bbad3addd02c68eb93323
diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp
index e1562c4..fee4e51 100644
--- a/src/server/meta_store.cpp
+++ b/src/server/meta_store.cpp
@@ -58,6 +58,19 @@ uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB 
*db,
     return last_flushed_decree;
 }
 
+std::string meta_store::get_usage_scenario() const
+{
+    // If couldn't find rocksdb usage scenario in meta column family, return 
normal in default.
+    std::string usage_scenario = ROCKSDB_ENV_USAGE_SCENARIO_NORMAL;
+    auto ec = get_string_value_from_meta_cf(false, 
ROCKSDB_ENV_USAGE_SCENARIO_KEY, &usage_scenario);
+    dassert_replica(ec == ::dsn::ERR_OK || ec == ::dsn::ERR_OBJECT_NOT_FOUND,
+                    "rocksdb {} get {} from meta column family failed: {}",
+                    _db->GetName(),
+                    ROCKSDB_ENV_USAGE_SCENARIO_KEY,
+                    ec.to_string());
+    return usage_scenario;
+}
+
 ::dsn::error_code meta_store::get_value_from_meta_cf(bool read_flushed_data,
                                                      const std::string &key,
                                                      uint64_t *value) const
@@ -72,18 +85,37 @@ uint64_t 
meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
                                                      uint64_t *value)
 {
     std::string data;
+    auto ec = get_string_value_from_meta_cf(db, cf, read_flushed_data, key, 
&data);
+    if (ec != ::dsn::ERR_OK) {
+        return ec;
+    }
+    dassert_f(dsn::buf2uint64(data, *value),
+              "rocksdb {} get \"{}\" from meta column family failed to parse 
into uint64",
+              db->GetName(),
+              data);
+    return ::dsn::ERR_OK;
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(bool 
read_flushed_data,
+                                                            const std::string 
&key,
+                                                            std::string 
*value) const
+{
+    return get_string_value_from_meta_cf(_db, _meta_cf, read_flushed_data, 
key, value);
+}
+
+::dsn::error_code meta_store::get_string_value_from_meta_cf(rocksdb::DB *db,
+                                                            
rocksdb::ColumnFamilyHandle *cf,
+                                                            bool 
read_flushed_data,
+                                                            const std::string 
&key,
+                                                            std::string *value)
+{
     rocksdb::ReadOptions rd_opts;
     if (read_flushed_data) {
         // only read 'flushed' data, mainly to read 'last_flushed_decree'
         rd_opts.read_tier = rocksdb::kPersistedTier;
     }
-    auto status = db->Get(rd_opts, cf, key, &data);
+    auto status = db->Get(rd_opts, cf, key, value);
     if (status.ok()) {
-        dassert_f(dsn::buf2uint64(data, *value),
-                  "rocksdb {} get {} from meta column family got error value 
{}",
-                  db->GetName(),
-                  key,
-                  data);
         return ::dsn::ERR_OK;
     }
 
@@ -97,7 +129,13 @@ uint64_t 
meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
 
 ::dsn::error_code meta_store::set_value_to_meta_cf(const std::string &key, 
uint64_t value) const
 {
-    auto status = _db->Put(_wt_opts, _meta_cf, key, std::to_string(value));
+    return set_string_value_to_meta_cf(key, std::to_string(value));
+}
+
+::dsn::error_code meta_store::set_string_value_to_meta_cf(const std::string 
&key,
+                                                          const std::string 
&value) const
+{
+    auto status = _db->Put(_wt_opts, _meta_cf, key, value);
     if (!status.ok()) {
         derror_replica(
             "Put {}={} to meta column family failed, status {}", key, value, 
status.ToString());
@@ -124,5 +162,11 @@ void 
meta_store::set_last_manual_compact_finish_time(uint64_t last_manual_compac
         set_value_to_meta_cf(LAST_MANUAL_COMPACT_FINISH_TIME, 
last_manual_compact_finish_time));
 }
 
+void meta_store::set_usage_scenario(const std::string &usage_scenario) const
+{
+    dcheck_eq_replica(::dsn::ERR_OK,
+                      
set_string_value_to_meta_cf(ROCKSDB_ENV_USAGE_SCENARIO_KEY, usage_scenario));
+}
+
 } // namespace server
 } // namespace pegasus
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index 322971e..6e7a7b8 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -30,21 +30,33 @@ public:
                                          rocksdb::ColumnFamilyHandle *meta_cf) 
const;
     uint32_t get_data_version() const;
     uint64_t get_last_manual_compact_finish_time() const;
+    std::string get_usage_scenario() const;
 
     void set_last_flushed_decree(uint64_t decree) const;
     void set_data_version(uint32_t version) const;
     void set_last_manual_compact_finish_time(uint64_t 
last_manual_compact_finish_time) const;
+    void set_usage_scenario(const std::string &usage_scenario) const;
 
 private:
     ::dsn::error_code
     get_value_from_meta_cf(bool read_flushed_data, const std::string &key, 
uint64_t *value) const;
+    ::dsn::error_code get_string_value_from_meta_cf(bool read_flushed_data,
+                                                    const std::string &key,
+                                                    std::string *value) const;
     ::dsn::error_code set_value_to_meta_cf(const std::string &key, uint64_t 
value) const;
+    ::dsn::error_code set_string_value_to_meta_cf(const std::string &key,
+                                                  const std::string &value) 
const;
 
     static ::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
                                                     
rocksdb::ColumnFamilyHandle *cf,
                                                     bool read_flushed_data,
                                                     const std::string &key,
                                                     uint64_t *value);
+    static ::dsn::error_code get_string_value_from_meta_cf(rocksdb::DB *db,
+                                                           
rocksdb::ColumnFamilyHandle *cf,
+                                                           bool 
read_flushed_data,
+                                                           const std::string 
&key,
+                                                           std::string *value);
 
     friend class pegasus_write_service;
 
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index b181c44..b61bf31 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -8,6 +8,7 @@
 #include <boost/lexical_cast.hpp>
 #include <rocksdb/convenience.h>
 #include <rocksdb/utilities/checkpoint.h>
+#include <rocksdb/utilities/options_util.h>
 #include <dsn/utility/chrono_literals.h>
 #include <dsn/utility/utils.h>
 #include <dsn/utility/filesystem.h>
@@ -1326,14 +1327,45 @@ void pegasus_server_impl::on_clear_scanner(const 
int64_t &args) { _context_cache
 
     ddebug("%s: start to open rocksDB's rdb(%s)", replica_name(), 
path.c_str());
 
+    // Here we create a `tmp_data_cf_opts` because we don't want to modify 
`_data_cf_opts`, which
+    // will be used elsewhere.
+    rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
     if (db_exist) {
-        // When DB exist, meta CF must be present.
         bool missing_meta_cf = true;
-        if (check_meta_cf(path, &missing_meta_cf) != ::dsn::ERR_OK) {
-            derror_replica("check meta column family failed");
+        bool missing_data_cf = true;
+        // Load latest options from option file stored in the db directory.
+        rocksdb::DBOptions loaded_db_opt;
+        std::vector<rocksdb::ColumnFamilyDescriptor> loaded_cf_descs;
+        rocksdb::ColumnFamilyOptions loaded_data_cf_opts;
+        // Set `ignore_unknown_options` true for forward compatibility.
+        auto status = rocksdb::LoadLatestOptions(path,
+                                                 rocksdb::Env::Default(),
+                                                 &loaded_db_opt,
+                                                 &loaded_cf_descs,
+                                                 
/*ignore_unknown_options=*/true);
+        if (!status.ok()) {
+            derror_replica("load latest option file failed.");
             return ::dsn::ERR_LOCAL_APP_FAILURE;
         }
+        for (int i = 0; i < loaded_cf_descs.size(); ++i) {
+            if (loaded_cf_descs[i].name == META_COLUMN_FAMILY_NAME) {
+                missing_meta_cf = false;
+            } else if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
+                missing_data_cf = false;
+                loaded_data_cf_opts = loaded_cf_descs[i].options;
+            } else {
+                derror_replica("unknown column family name.");
+                return ::dsn::ERR_LOCAL_APP_FAILURE;
+            }
+        }
+        // When DB exists, meta CF and data CF must be present.
         dassert_replica(!missing_meta_cf, "You must upgrade Pegasus server 
from 2.0");
+        dassert_replica(!missing_data_cf, "Missing data column family");
+        // Reset usage scenario related options according to 
loaded_data_cf_opts.
+        // We don't use `loaded_data_cf_opts` directly because pointer-typed 
options will only be
+        // initialized with default values when calling 'LoadLatestOptions', 
see
+        // 'rocksdb/utilities/options_util.h'.
+        reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
     } else {
         // When create new DB, we have to create a new column family to store 
meta data (meta column
         // family).
@@ -1341,7 +1373,13 @@ void pegasus_server_impl::on_clear_scanner(const int64_t 
&args) { _context_cache
     }
 
     std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
-        {{DATA_COLUMN_FAMILY_NAME, _data_cf_opts}, {META_COLUMN_FAMILY_NAME, 
_meta_cf_opts}});
+        {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, 
{META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
+    auto s = rocksdb::CheckOptionsCompatibility(
+        path, rocksdb::Env::Default(), _db_opts, column_families, 
/*ignore_unknown_options=*/true);
+    if (!s.ok() && !s.IsNotFound()) {
+        derror_replica("rocksdb::CheckOptionsCompatibility failed, error = 
{}", s.ToString());
+        return ::dsn::ERR_LOCAL_APP_FAILURE;
+    }
     std::vector<rocksdb::ColumnFamilyHandle *> handles_opened;
     auto status = rocksdb::DB::Open(_db_opts, path, column_families, 
&handles_opened, &_db);
     if (!status.ok()) {
@@ -1360,6 +1398,7 @@ void pegasus_server_impl::on_clear_scanner(const int64_t 
&args) { _context_cache
     if (db_exist) {
         _last_committed_decree = _meta_store->get_last_flushed_decree();
         _pegasus_data_version = _meta_store->get_data_version();
+        _usage_scenario = _meta_store->get_usage_scenario();
         uint64_t last_manual_compact_finish_time =
             _meta_store->get_last_manual_compact_finish_time();
         if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
@@ -1407,8 +1446,10 @@ void pegasus_server_impl::on_clear_scanner(const int64_t 
&args) { _context_cache
 
     _is_open = true;
 
-    // set default usage scenario after db opened.
-    set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL);
+    if (!db_exist) {
+        // When create a new db, update usage scenario according to app envs.
+        update_usage_scenario(envs);
+    }
 
     dinfo_replica("start the update rocksdb statistics timer task");
     _update_replica_rdb_stat =
@@ -2472,6 +2513,7 @@ bool pegasus_server_impl::set_usage_scenario(const 
std::string &usage_scenario)
         return false;
     }
     if (set_options(new_options)) {
+        _meta_store->set_usage_scenario(usage_scenario);
         _usage_scenario = usage_scenario;
         ddebug_replica(
             "set usage scenario from \"{}\" to \"{}\" succeed", 
old_usage_scenario, usage_scenario);
@@ -2483,6 +2525,23 @@ bool pegasus_server_impl::set_usage_scenario(const 
std::string &usage_scenario)
     }
 }
 
+void pegasus_server_impl::reset_usage_scenario_options(
+    const rocksdb::ColumnFamilyOptions &base_opts, 
rocksdb::ColumnFamilyOptions *target_opts)
+{
+    // reset usage scenario related options, refer to options set in 
'set_usage_scenario' function.
+    target_opts->level0_file_num_compaction_trigger = 
base_opts.level0_file_num_compaction_trigger;
+    target_opts->level0_slowdown_writes_trigger = 
base_opts.level0_slowdown_writes_trigger;
+    target_opts->level0_stop_writes_trigger = 
base_opts.level0_stop_writes_trigger;
+    target_opts->soft_pending_compaction_bytes_limit =
+        base_opts.soft_pending_compaction_bytes_limit;
+    target_opts->hard_pending_compaction_bytes_limit =
+        base_opts.hard_pending_compaction_bytes_limit;
+    target_opts->disable_auto_compactions = base_opts.disable_auto_compactions;
+    target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
+    target_opts->write_buffer_size = base_opts.write_buffer_size;
+    target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
+}
+
 bool pegasus_server_impl::set_options(
     const std::unordered_map<std::string, std::string> &new_options)
 {
@@ -2607,25 +2666,6 @@ void pegasus_server_impl::set_partition_version(int32_t 
partition_version)
     // TODO(heyuchen): set filter _partition_version in further pr
 }
 
-::dsn::error_code pegasus_server_impl::check_meta_cf(const std::string &path, 
bool *missing_meta_cf)
-{
-    *missing_meta_cf = true;
-    std::vector<std::string> column_families;
-    auto s = rocksdb::DB::ListColumnFamilies(rocksdb::DBOptions(), path, 
&column_families);
-    if (!s.ok()) {
-        derror_replica("rocksdb::DB::ListColumnFamilies failed, error = {}", 
s.ToString());
-        return ::dsn::ERR_LOCAL_APP_FAILURE;
-    }
-
-    for (const auto &column_family : column_families) {
-        if (column_family == META_COLUMN_FAMILY_NAME) {
-            *missing_meta_cf = false;
-            break;
-        }
-    }
-    return ::dsn::ERR_OK;
-}
-
 ::dsn::error_code pegasus_server_impl::flush_all_family_columns(bool wait)
 {
     rocksdb::FlushOptions options;
diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h
index 2ce6015..7165f76 100644
--- a/src/server/pegasus_server_impl.h
+++ b/src/server/pegasus_server_impl.h
@@ -166,6 +166,8 @@ private:
     friend class pegasus_compression_options_test;
     friend class pegasus_server_impl_test;
     FRIEND_TEST(pegasus_server_impl_test, default_data_version);
+    FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
+    FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);
 
     friend class pegasus_manual_compact_service;
     friend class pegasus_write_service;
@@ -260,6 +262,9 @@ private:
     // return true if successfully changed
     bool set_usage_scenario(const std::string &usage_scenario);
 
+    void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions 
&base_opts,
+                                      rocksdb::ColumnFamilyOptions 
*target_opts);
+
     // return true if successfully set
     bool set_options(const std::unordered_map<std::string, std::string> 
&new_options);
 
@@ -305,8 +310,6 @@ private:
         return false;
     }
 
-    ::dsn::error_code check_meta_cf(const std::string &path, bool 
*missing_meta_cf);
-
     void release_db();
     void release_db(rocksdb::DB *db, const 
std::vector<rocksdb::ColumnFamilyHandle *> &handles);
 
diff --git a/src/server/test/pegasus_server_impl_test.cpp 
b/src/server/test/pegasus_server_impl_test.cpp
index c8c16b4..d9d161a 100644
--- a/src/server/test/pegasus_server_impl_test.cpp
+++ b/src/server/test/pegasus_server_impl_test.cpp
@@ -11,7 +11,7 @@ namespace server {
 class pegasus_server_impl_test : public pegasus_server_test_base
 {
 public:
-    pegasus_server_impl_test() : pegasus_server_test_base() { start(); }
+    pegasus_server_impl_test() : pegasus_server_test_base() {}
 
     void test_table_level_slow_query()
     {
@@ -59,12 +59,45 @@ public:
     }
 };
 
-TEST_F(pegasus_server_impl_test, test_table_level_slow_query) { 
test_table_level_slow_query(); }
+TEST_F(pegasus_server_impl_test, test_table_level_slow_query)
+{
+    start();
+    test_table_level_slow_query();
+}
 
 TEST_F(pegasus_server_impl_test, default_data_version)
 {
+    start();
     ASSERT_EQ(_server->_pegasus_data_version, 1);
 }
 
+TEST_F(pegasus_server_impl_test, test_open_db_with_latest_options)
+{
+    // open a new db with no app env.
+    start();
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_NORMAL, _server->_usage_scenario);
+    // set bulk_load scenario for the db.
+    
ASSERT_TRUE(_server->set_usage_scenario(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD));
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+    rocksdb::Options opts = _server->_db->GetOptions();
+    ASSERT_EQ(1000000000, opts.level0_file_num_compaction_trigger);
+    ASSERT_EQ(true, opts.disable_auto_compactions);
+    // reopen the db.
+    _server->stop(false);
+    start();
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+    ASSERT_EQ(opts.level0_file_num_compaction_trigger,
+              _server->_db->GetOptions().level0_file_num_compaction_trigger);
+    ASSERT_EQ(opts.disable_auto_compactions, 
_server->_db->GetOptions().disable_auto_compactions);
+}
+
+TEST_F(pegasus_server_impl_test, test_open_db_with_app_envs)
+{
+    std::map<std::string, std::string> envs;
+    envs[ROCKSDB_ENV_USAGE_SCENARIO_KEY] = 
ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;
+    start(envs);
+    ASSERT_EQ(ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD, _server->_usage_scenario);
+}
+
 } // namespace server
 } // namespace pegasus


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to