This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a6c28e4a fix: fix a crash bug when open an incomplete created RocksDB 
instance (#1451)
7a6c28e4a is described below

commit 7a6c28e4ac45dafb3f41adc5335f727b7961710d
Author: Yingchun Lai <[email protected]>
AuthorDate: Fri Apr 21 15:46:34 2023 +0800

    fix: fix a crash bug when open an incomplete created RocksDB instance 
(#1451)
    
    https://github.com/apache/incubator-pegasus/issues/1450
    
    If replica server attempt to open an incomplete RocksDB instance (maybe 
caused
    by a previous crash), it will crash before moving the incomplete path to 
".err"
    trash path, and it will crash again if restart the server.
    
    This patch avoid to crash before moving the incomplete RocksDB path to 
".err" path,
    thus the replica has an opportunity to recovery automatically without move 
the
    incomplete RocksDB path manually.
---
 src/server/meta_store.cpp                          | 31 ++++++------
 src/server/meta_store.h                            |  6 +--
 src/server/pegasus_server_impl.cpp                 | 55 +++++++++++++++-------
 .../base_api_test/integration_test.cpp             |  7 ++-
 src/test/function_test/utils/global_env.cpp        |  4 +-
 src/test/function_test/utils/test_util.cpp         | 29 ++++++++----
 src/test/function_test/utils/utils.h               | 16 +++++--
 src/test_util/test_util.cpp                        | 28 +++++++++--
 src/test_util/test_util.h                          | 17 +++++--
 src/utils/fmt_logging.h                            |  1 +
 10 files changed, 139 insertions(+), 55 deletions(-)

diff --git a/src/server/meta_store.cpp b/src/server/meta_store.cpp
index b018151ed..3ba5187a0 100644
--- a/src/server/meta_store.cpp
+++ b/src/server/meta_store.cpp
@@ -44,29 +44,30 @@ meta_store::meta_store(pegasus_server_impl *server,
     _wt_opts.disableWAL = true;
 }
 
-uint64_t meta_store::get_last_flushed_decree() const
+dsn::error_code meta_store::get_last_flushed_decree(uint64_t *decree) const
 {
-    uint64_t last_flushed_decree = 0;
-    auto ec = get_value_from_meta_cf(true, LAST_FLUSHED_DECREE, 
&last_flushed_decree);
-    CHECK_EQ_PREFIX(::dsn::ERR_OK, ec);
-    return last_flushed_decree;
+    LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
+                          get_value_from_meta_cf(true, LAST_FLUSHED_DECREE, 
decree),
+                          "get_value_from_meta_cf failed");
+    return dsn::ERR_OK;
 }
 
-uint32_t meta_store::get_data_version() const
+dsn::error_code meta_store::get_data_version(uint32_t *version) const
 {
     uint64_t pegasus_data_version = 0;
-    auto ec = get_value_from_meta_cf(false, DATA_VERSION, 
&pegasus_data_version);
-    CHECK_EQ_PREFIX(::dsn::ERR_OK, ec);
-    return static_cast<uint32_t>(pegasus_data_version);
+    LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
+                          get_value_from_meta_cf(false, DATA_VERSION, 
&pegasus_data_version),
+                          "get_value_from_meta_cf failed");
+    *version = static_cast<uint32_t>(pegasus_data_version);
+    return dsn::ERR_OK;
 }
 
-uint64_t meta_store::get_last_manual_compact_finish_time() const
+dsn::error_code meta_store::get_last_manual_compact_finish_time(uint64_t *ts) 
const
 {
-    uint64_t last_manual_compact_finish_time = 0;
-    auto ec = get_value_from_meta_cf(
-        false, LAST_MANUAL_COMPACT_FINISH_TIME, 
&last_manual_compact_finish_time);
-    CHECK_EQ_PREFIX(::dsn::ERR_OK, ec);
-    return last_manual_compact_finish_time;
+    LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
+                          get_value_from_meta_cf(false, 
LAST_MANUAL_COMPACT_FINISH_TIME, ts),
+                          "get_value_from_meta_cf failed");
+    return dsn::ERR_OK;
 }
 
 uint64_t meta_store::get_decree_from_readonly_db(rocksdb::DB *db,
diff --git a/src/server/meta_store.h b/src/server/meta_store.h
index 8868e7f0c..247702657 100644
--- a/src/server/meta_store.h
+++ b/src/server/meta_store.h
@@ -45,11 +45,11 @@ class meta_store : public dsn::replication::replica_base
 public:
     meta_store(pegasus_server_impl *server, rocksdb::DB *db, 
rocksdb::ColumnFamilyHandle *meta_cf);
 
-    uint64_t get_last_flushed_decree() const;
+    dsn::error_code get_last_flushed_decree(uint64_t *decree) const;
     uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
                                          rocksdb::ColumnFamilyHandle *meta_cf) 
const;
-    uint32_t get_data_version() const;
-    uint64_t get_last_manual_compact_finish_time() const;
+    dsn::error_code get_data_version(uint32_t *version) const;
+    dsn::error_code get_last_manual_compact_finish_time(uint64_t *ts) const;
     std::string get_usage_scenario() const;
 
     void set_last_flushed_decree(uint64_t decree) const;
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index a22c2fd3b..ae1ce7e17 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -73,6 +73,7 @@
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
 #include "utils/chrono_literals.h"
+#include "utils/defer.h"
 #include "utils/filesystem.h"
 #include "utils/flags.h"
 #include "utils/fmt_logging.h"
@@ -1609,9 +1610,10 @@ dsn::error_code pegasus_server_impl::start(int argc, 
char **argv)
         // When DB exists, meta CF and data CF must be present.
         bool missing_meta_cf = true;
         bool missing_data_cf = true;
-        if (check_column_families(rdb_path, &missing_meta_cf, 
&missing_data_cf) != dsn::ERR_OK) {
+        auto ec = check_column_families(rdb_path, &missing_meta_cf, 
&missing_data_cf);
+        if (ec != dsn::ERR_OK) {
             LOG_ERROR_PREFIX("check column families failed");
-            return dsn::ERR_LOCAL_APP_FAILURE;
+            return ec;
         }
         CHECK_PREFIX_MSG(!missing_meta_cf, "You must upgrade Pegasus server 
from 2.0");
         CHECK_PREFIX_MSG(!missing_data_cf, "Missing data column family");
@@ -1688,19 +1690,29 @@ dsn::error_code pegasus_server_impl::start(int argc, 
char **argv)
     _meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);
 
     if (db_exist) {
-        _last_committed_decree = _meta_store->get_last_flushed_decree();
-        _pegasus_data_version = _meta_store->get_data_version();
+        auto cleanup = dsn::defer([this]() { release_db(); });
+        uint64_t decree = 0;
+        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
+                              _meta_store->get_last_flushed_decree(&decree),
+                              "get_last_flushed_decree failed");
+        _last_committed_decree.store(static_cast<int64_t>(decree));
+        LOG_AND_RETURN_NOT_OK(ERROR_PREFIX,
+                              
_meta_store->get_data_version(&_pegasus_data_version),
+                              "get_data_version failed");
         _usage_scenario = _meta_store->get_usage_scenario();
-        uint64_t last_manual_compact_finish_time =
-            _meta_store->get_last_manual_compact_finish_time();
-        if (_pegasus_data_version > PEGASUS_DATA_VERSION_MAX) {
-            LOG_ERROR_PREFIX("open app failed, unsupported data version {}", 
_pegasus_data_version);
-            release_db();
-            return dsn::ERR_LOCAL_APP_FAILURE;
-        }
-
+        uint64_t last_manual_compact_finish_time = 0;
+        LOG_AND_RETURN_NOT_OK(
+            ERROR_PREFIX,
+            
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time),
+            "get_last_manual_compact_finish_time failed");
+        LOG_AND_RETURN_NOT_TRUE(ERROR_PREFIX,
+                                _pegasus_data_version <= 
PEGASUS_DATA_VERSION_MAX,
+                                dsn::ERR_LOCAL_APP_FAILURE,
+                                "open app failed, unsupported data version {}",
+                                _pegasus_data_version);
         // update last manual compact finish timestamp
         
_manual_compact_svc.init_last_finish_time_ms(last_manual_compact_finish_time);
+        cleanup.cancel();
     } else {
         // Write initial meta data to meta CF and flush when create new DB.
         _meta_store->set_data_version(PEGASUS_DATA_VERSION_MAX);
@@ -1936,7 +1948,8 @@ private:
     {
         ::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_checkpoints_lock);
         CHECK_GT_PREFIX(last_commit, last_durable_decree());
-        int64_t last_flushed = 
static_cast<int64_t>(_meta_store->get_last_flushed_decree());
+        uint64_t last_flushed = 0;
+        CHECK_OK_PREFIX(_meta_store->get_last_flushed_decree(&last_flushed));
         CHECK_EQ_PREFIX(last_commit, last_flushed);
         if (!_checkpoints.empty()) {
             CHECK_GT_PREFIX(last_commit, _checkpoints.back());
@@ -1961,7 +1974,8 @@ private:
         return ::dsn::ERR_WRONG_TIMING;
 
     int64_t last_durable = last_durable_decree();
-    int64_t last_flushed = 
static_cast<int64_t>(_meta_store->get_last_flushed_decree());
+    uint64_t last_flushed = 0;
+    CHECK_OK_PREFIX(_meta_store->get_last_flushed_decree(&last_flushed));
     int64_t last_commit = last_committed_decree();
 
     CHECK_LE_PREFIX(last_durable, last_flushed);
@@ -3166,6 +3180,11 @@ bool pegasus_server_impl::set_options(
         return ::dsn::ERR_LOCAL_APP_FAILURE;
     }
 
+    if (column_families.empty()) {
+        LOG_ERROR_PREFIX("column families are empty");
+        return ::dsn::ERR_LOCAL_APP_FAILURE;
+    }
+
     for (const auto &column_family : column_families) {
         if (column_family == META_COLUMN_FAMILY_NAME) {
             *missing_meta_cf = false;
@@ -3220,7 +3239,10 @@ uint64_t pegasus_server_impl::do_manual_compact(const 
rocksdb::CompactRangeOptio
     // update rocksdb statistics immediately
     update_replica_rocksdb_statistics();
 
-    return _meta_store->get_last_manual_compact_finish_time();
+    uint64_t last_manual_compact_finish_time = 0;
+    CHECK_OK_PREFIX(
+        
_meta_store->get_last_manual_compact_finish_time(&last_manual_compact_finish_time));
+    return last_manual_compact_finish_time;
 }
 
 bool pegasus_server_impl::release_storage_after_manual_compact()
@@ -3246,7 +3268,8 @@ bool 
pegasus_server_impl::release_storage_after_manual_compact()
     gc_checkpoints(true);
     LOG_INFO_PREFIX("finish gc_checkpoints, time_used = {}ms", dsn_now_ms() - 
start_time);
 
-    int64_t new_last_durable = _meta_store->get_last_flushed_decree();
+    uint64_t new_last_durable = 0;
+    CHECK_OK_PREFIX(_meta_store->get_last_flushed_decree(&new_last_durable));
     if (new_last_durable > old_last_durable) {
         LOG_INFO_PREFIX("release storage succeed, last_durable_decree changed 
from {} to {}",
                         old_last_durable,
diff --git a/src/test/function_test/base_api_test/integration_test.cpp 
b/src/test/function_test/base_api_test/integration_test.cpp
index 027635e2d..887921b3b 100644
--- a/src/test/function_test/base_api_test/integration_test.cpp
+++ b/src/test/function_test/base_api_test/integration_test.cpp
@@ -66,6 +66,7 @@ TEST_F(integration_test, write_corrupt_db)
                 corruption_count++;
                 break;
             } else if (ret == PERR_TIMEOUT) {
+                corruption_count++;
                 // If RS-1 crashed before (learn failed when write storage 
engine but get
                 // kCorruption), a new write operation on the primary replica 
it ever held will
                 // cause timeout.
@@ -95,11 +96,13 @@ TEST_F(integration_test, write_corrupt_db)
         ASSERT_EQ(value, got_value);
     }
 
-    EXPECT_GT(ok_count, 0);
-    EXPECT_GT(corruption_count, 0);
+    ASSERT_GT(ok_count, 0);
+    ASSERT_GT(corruption_count, 0);
     std::cout << "ok_count: " << ok_count << ", corruption_count: " << 
corruption_count
               << std::endl;
 
+    // Make effort to get a trustable alive replica server count.
+    WAIT_IN_TIME([&] { return get_alive_replica_server_count() != 3; }, 30);
     // Now only 2 RSs left, or RS-1 has no leader replicas.
     ASSERT_IN_TIME(
         [&] {
diff --git a/src/test/function_test/utils/global_env.cpp 
b/src/test/function_test/utils/global_env.cpp
index 43b027cd2..efeea9691 100644
--- a/src/test/function_test/utils/global_env.cpp
+++ b/src/test/function_test/utils/global_env.cpp
@@ -45,12 +45,12 @@ global_env::global_env()
 void global_env::get_dirs()
 {
     std::string output1;
-    ASSERT_NO_FATAL_FAILURE(run_cmd(
+    ASSERT_NO_FATAL_FAILURE(run_cmd_no_error(
         "ps aux | grep '/meta1/pegasus_server' | grep -v grep | awk '{print 
$2}'", &output1));
 
     // get the dir of a process in onebox, say: $PEGASUS/onebox/meta1
     std::string output2;
-    ASSERT_NO_FATAL_FAILURE(run_cmd("readlink /proc/" + output1 + "/cwd", 
&output2));
+    ASSERT_NO_FATAL_FAILURE(run_cmd_no_error("readlink /proc/" + output1 + 
"/cwd", &output2));
 
     _pegasus_root = dirname(dirname((char *)output2.c_str()));
     std::cout << "Pegasus project root: " << _pegasus_root << std::endl;
diff --git a/src/test/function_test/utils/test_util.cpp 
b/src/test/function_test/utils/test_util.cpp
index f417379e9..d9b910681 100644
--- a/src/test/function_test/utils/test_util.cpp
+++ b/src/test/function_test/utils/test_util.cpp
@@ -21,6 +21,7 @@
 
 #include <nlohmann/json.hpp>
 #include <unistd.h>
+#include <algorithm>
 #include <fstream>
 #include <initializer_list>
 #include <utility>
@@ -43,18 +44,19 @@
 #include "utils/error_code.h"
 #include "utils/filesystem.h"
 #include "utils/rand.h"
-#include "utils/string_conv.h"
 
 using dsn::partition_configuration;
 using dsn::replication::replica_helper;
 using dsn::replication::replication_ddl_client;
 using dsn::rpc_address;
 using nlohmann::json;
+using std::map;
+using std::string;
 using std::vector;
 
 namespace pegasus {
 
-test_util::test_util(std::map<std::string, std::string> create_envs)
+test_util::test_util(map<string, string> create_envs)
     : cluster_name_("mycluster"), app_name_("temp"), 
create_envs_(std::move(create_envs))
 {
 }
@@ -93,10 +95,10 @@ void test_util::SetUp()
     ASSERT_EQ(partition_count_, partitions_.size());
 }
 
-void test_util::run_cmd_from_project_root(const std::string &cmd)
+void test_util::run_cmd_from_project_root(const string &cmd)
 {
     ASSERT_EQ(0, ::chdir(global_env::instance()._pegasus_root.c_str()));
-    ASSERT_NO_FATAL_FAILURE(run_cmd(cmd));
+    ASSERT_NO_FATAL_FAILURE(run_cmd_no_error(cmd));
 }
 
 int test_util::get_alive_replica_server_count()
@@ -107,14 +109,25 @@ int test_util::get_alive_replica_server_count()
     run_cmd_from_project_root(fmt::format("echo 'nodes -djo {}' | ./run.sh 
shell", json_filename));
     std::ifstream f(json_filename);
     const auto data = json::parse(f);
+    vector<string> rs_addrs;
+    for (const auto &rs : data["details"]) {
+        if (rs["status"] == "UNALIVE") {
+            continue;
+        }
+        rs_addrs.push_back(rs["address"]);
+    }
+
     int replica_server_count = 0;
-    if (!dsn::buf2int32(data["summary"]["alive_node_count"], 
replica_server_count)) {
-        return -1;
+    for (const auto &rs_addr : rs_addrs) {
+        int ret = run_cmd(fmt::format("curl {}/version", rs_addr));
+        if (ret == 0) {
+            replica_server_count++;
+        }
     }
     return replica_server_count;
 }
 
-int test_util::get_leader_count(const std::string &table_name, int 
replica_server_index)
+int test_util::get_leader_count(const string &table_name, int 
replica_server_index)
 {
     const auto json_filename = fmt::format("test_json_file.{}", 
dsn::rand::next_u32());
     auto cleanup =
@@ -126,7 +139,7 @@ int test_util::get_leader_count(const std::string 
&table_name, int replica_serve
     int leader_count = 0;
     for (const auto &replica : data["replicas"]) {
         const auto &primary = to_string(replica["primary"]);
-        if (primary.find(fmt::format("3480{}", replica_server_index)) != 
std::string::npos) {
+        if (primary.find(fmt::format("3480{}", replica_server_index)) != 
string::npos) {
             leader_count++;
         }
     }
diff --git a/src/test/function_test/utils/utils.h 
b/src/test/function_test/utils/utils.h
index b37526e6e..526bd3ab2 100644
--- a/src/test/function_test/utils/utils.h
+++ b/src/test/function_test/utils/utils.h
@@ -206,14 +206,24 @@ inline void compare(const T &expect, const U &actual)
     }
 }
 
-inline void run_cmd(const std::string &cmd, std::string *output = nullptr)
+inline int run_cmd(const std::string &cmd, std::string *output = nullptr)
 {
     std::stringstream ss;
     int ret = dsn::utils::pipe_execute(cmd.c_str(), ss);
+    if (output) {
+        *output = dsn::utils::trim_string((char *)ss.str().c_str());
+    }
+    return ret;
+}
+
+inline void run_cmd_no_error(const std::string &cmd, std::string *output = 
nullptr)
+{
+    std::string tmp;
+    int ret = run_cmd(cmd, &tmp);
     ASSERT_TRUE(ret == 0 || ret == 256) << "ret: " << ret << std::endl
                                         << "cmd: " << cmd << std::endl
-                                        << "output: " << ss.str();
+                                        << "output: " << tmp;
     if (output) {
-        *output = dsn::utils::trim_string((char *)ss.str().c_str());
+        *output = tmp;
     }
 }
diff --git a/src/test_util/test_util.cpp b/src/test_util/test_util.cpp
index 2b69c58d9..5ab185503 100644
--- a/src/test_util/test_util.cpp
+++ b/src/test_util/test_util.cpp
@@ -32,7 +32,7 @@
 
 namespace pegasus {
 
-void AssertEventually(const std::function<void(void)> &f, int timeout_sec, 
AssertBackoff backoff)
+void AssertEventually(const std::function<void(void)> &f, int timeout_sec, 
WaitBackoff backoff)
 {
     // TODO(yingchun): should use mono time
     uint64_t deadline = dsn_now_s() + timeout_sec;
@@ -69,10 +69,10 @@ void AssertEventually(const std::function<void(void)> &f, 
int timeout_sec, Asser
             // If they had failures, sleep and try again.
             int sleep_ms = 0;
             switch (backoff) {
-            case AssertBackoff::EXPONENTIAL:
+            case WaitBackoff::EXPONENTIAL:
                 sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
                 break;
-            case AssertBackoff::NONE:
+            case WaitBackoff::NONE:
                 sleep_ms = 1000;
                 break;
             default:
@@ -93,4 +93,26 @@ void AssertEventually(const std::function<void(void)> &f, 
int timeout_sec, Asser
     }
 }
 
+void WaitCondition(const std::function<bool(void)> &f, int timeout_sec, 
WaitBackoff backoff)
+{
+    uint64_t deadline = dsn_now_s() + timeout_sec;
+    for (int attempts = 0; dsn_now_s() < deadline; attempts++) {
+        if (f()) {
+            break;
+        }
+        int sleep_ms = 0;
+        switch (backoff) {
+        case WaitBackoff::EXPONENTIAL:
+            sleep_ms = (attempts < 10) ? (1 << attempts) : 1000;
+            break;
+        case WaitBackoff::NONE:
+            sleep_ms = 1000;
+            break;
+        default:
+            LOG_FATAL("Unknown backoff type");
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
+    }
+}
+
 } // namespace pegasus
diff --git a/src/test_util/test_util.h b/src/test_util/test_util.h
index 8ae2184ab..ec7bce6e2 100644
--- a/src/test_util/test_util.h
+++ b/src/test_util/test_util.h
@@ -39,7 +39,13 @@ namespace pegasus {
 
 #define ASSERT_IN_TIME_WITH_FIXED_INTERVAL(expr, sec)                          
                    \
     do {                                                                       
                    \
-        AssertEventually(expr, sec, AssertBackoff::NONE);                      
                    \
+        AssertEventually(expr, sec, WaitBackoff::NONE);                        
                    \
+        NO_PENDING_FATALS();                                                   
                    \
+    } while (0)
+
+#define WAIT_IN_TIME(expr, sec)                                                
                    \
+    do {                                                                       
                    \
+        WaitCondition(expr, sec);                                              
                    \
         NO_PENDING_FATALS();                                                   
                    \
     } while (0)
 
@@ -55,7 +61,7 @@ namespace pegasus {
 // To check whether AssertEventually() eventually succeeded, call
 // NO_PENDING_FATALS() afterward, or use ASSERT_EVENTUALLY() which performs
 // this check automatically.
-enum class AssertBackoff
+enum class WaitBackoff
 {
     // Use exponential back-off while looping, capped at one second.
     EXPONENTIAL,
@@ -65,6 +71,11 @@ enum class AssertBackoff
 };
 void AssertEventually(const std::function<void(void)> &f,
                       int timeout_sec = 30,
-                      AssertBackoff backoff = AssertBackoff::EXPONENTIAL);
+                      WaitBackoff backoff = WaitBackoff::EXPONENTIAL);
 
+// Wait until 'f()' succeeds or timeout, there is no GTest 'fatal failures'
+// regardless failed or timeout.
+void WaitCondition(const std::function<bool(void)> &f,
+                   int timeout_sec = 30,
+                   WaitBackoff backoff = WaitBackoff::EXPONENTIAL);
 } // namespace pegasus
diff --git a/src/utils/fmt_logging.h b/src/utils/fmt_logging.h
index 06db36ca7..1f6e96120 100644
--- a/src/utils/fmt_logging.h
+++ b/src/utils/fmt_logging.h
@@ -254,6 +254,7 @@ inline const char *null_str_printer(const char *s) { return 
s == nullptr ? "(nul
 #define CHECK_LE_PREFIX(var1, var2) CHECK_LE_PREFIX_MSG(var1, var2, "")
 #define CHECK_GT_PREFIX(var1, var2) CHECK_GT_PREFIX_MSG(var1, var2, "")
 #define CHECK_LT_PREFIX(var1, var2) CHECK_LT_PREFIX_MSG(var1, var2, "")
+#define CHECK_OK_PREFIX(x) CHECK_EQ_PREFIX_MSG(x, ::dsn::ERR_OK, "")
 
 // Return the given status if condition is not true.
 #define LOG_AND_RETURN_NOT_TRUE(level, s, err, ...)                            
                    \


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to