This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 12c8b56e2 fix: Fault-tolerant storage engine errors for read 
operations (#1447)
12c8b56e2 is described below

commit 12c8b56e25abcacdf70269f2ed67b37482a92693
Author: Yingchun Lai <[email protected]>
AuthorDate: Mon Apr 24 18:20:06 2023 +0800

    fix: Fault-tolerant storage engine errors for read operations (#1447)
    
    https://github.com/apache/incubator-pegasus/issues/1383
    
    ReplicaServer doesn't handle the error returned from storage engine, thus
    even if the storage engine is corrupted, the server doesn't recognize these
    situactions, and still running happily. However, the client always gets an
    error status.
    This situaction will not recover automatically except stopping the server
    and moving away the corrupted RocksDB directories manually.
    
    This patch handle the kCorruption error returned from storage engine, then
    close the replcia, move the directory to ".err" trash path. The replica is
    able to recover automatically (if RF > 1).
---
 src/common/storage_serverlet.h                     | 37 ++++++++++---
 src/replica/replica.cpp                            | 16 +++++-
 src/replica/replication_app_base.h                 |  2 +-
 src/replica/storage/simple_kv/simple_kv.server.h   |  6 ++-
 src/replica/test/mock_utils.h                      |  2 +-
 src/server/pegasus_read_service.h                  |  5 +-
 src/server/pegasus_server_impl.cpp                 | 10 ++++
 .../base_api_test/integration_test.cpp             | 62 ++++++++++++++++++++++
 8 files changed, 127 insertions(+), 13 deletions(-)

diff --git a/src/common/storage_serverlet.h b/src/common/storage_serverlet.h
index 30fa56287..ee1ef3bee 100644
--- a/src/common/storage_serverlet.h
+++ b/src/common/storage_serverlet.h
@@ -30,6 +30,8 @@
 #include <unordered_map>
 #include <functional>
 
+#include "replica/storage/simple_kv/simple_kv.code.definition.h"
+#include "rrdb/rrdb.code.definition.h"
 #include "runtime/api_task.h"
 #include "runtime/api_layer1.h"
 #include "runtime/app_model.h"
@@ -50,7 +52,7 @@ template <typename T>
 class storage_serverlet
 {
 protected:
-    typedef std::function<void(T *, dsn::message_ex *req)> rpc_handler;
+    typedef std::function<int(T *, dsn::message_ex *req)> rpc_handler;
     static std::unordered_map<std::string, rpc_handler> s_handlers;
     static std::vector<rpc_handler> s_vhandlers;
 
@@ -60,11 +62,19 @@ protected:
                                const char *name,
                                void (*handler)(T *svc, const TReq &req, 
rpc_replier<TResp> &resp))
     {
+        // Only allowed to register simple.kv rpc handler.
+        CHECK(dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_READ == 
rpc_code ||
+                  dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_WRITE 
== rpc_code ||
+                  
dsn::replication::application::RPC_SIMPLE_KV_SIMPLE_KV_APPEND == rpc_code,
+              "Not allowed to register with rpc_code {}",
+              rpc_code);
         rpc_handler h = [handler](T *p, dsn::message_ex *r) {
             TReq req;
             ::dsn::unmarshall(r, req);
             rpc_replier<TResp> replier(r->create_response());
             handler(p, req, replier);
+            // For simple.kv, always return 0 which means success.
+            return 0;
         };
 
         return register_async_rpc_handler(rpc_code, name, h);
@@ -76,7 +86,9 @@ protected:
                                                      void (*handler)(T *svc, 
TRpcHolder))
     {
         rpc_handler h = [handler](T *p, dsn::message_ex *request) {
-            handler(p, TRpcHolder::auto_reply(request));
+            auto rh = TRpcHolder::auto_reply(request);
+            handler(p, rh);
+            return rh.response().error;
         };
 
         return register_async_rpc_handler(rpc_code, name, h);
@@ -87,10 +99,17 @@ protected:
                                            const char *name,
                                            void (*handler)(T *svc, const TReq 
&req))
     {
+        // Only allowed to register RPC_RRDB_RRDB_CLEAR_SCANNER handler.
+        CHECK_EQ_MSG(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER,
+                     rpc_code,
+                     "Not allowed to register with rpc_code {}",
+                     rpc_code);
         rpc_handler h = [handler](T *p, dsn::message_ex *r) {
             TReq req;
             ::dsn::unmarshall(r, req);
             handler(p, req);
+            // For RPC_RRDB_RRDB_CLEAR_SCANNER, always return 0 which means 
success.
+            return 0;
         };
 
         return register_async_rpc_handler(rpc_code, name, h);
@@ -114,11 +133,15 @@ protected:
 
     static const rpc_handler *find_handler(dsn::task_code rpc_code)
     {
-        if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr)
+        if (rpc_code < s_vhandlers.size() && s_vhandlers[rpc_code] != nullptr) 
{
             return &s_vhandlers[rpc_code];
+        }
+
         auto iter = s_handlers.find(rpc_code.to_string());
-        if (iter != s_handlers.end())
+        if (iter != s_handlers.end()) {
             return &(iter->second);
+        }
+
         return nullptr;
     }
 
@@ -128,16 +151,16 @@ protected:
         dsn::task_code t = request->rpc_code();
         const rpc_handler *ptr = find_handler(t);
         if (ptr != nullptr) {
-            // TODO(yingchun): add return value
-            (*ptr)(static_cast<T *>(this), request);
+            return (*ptr)(static_cast<T *>(this), request);
         } else {
             LOG_WARNING("recv message with unhandled rpc name {} from {}, 
trace_id = {:#018x} ",
                         t,
                         request->header->from_address,
                         request->header->trace_id);
             dsn_rpc_reply(request->create_response(), 
::dsn::ERR_HANDLER_NOT_FOUND);
+            // TODO(yingchun): return a non-zero value
+            return 0;
         }
-        return 0;
     }
 };
 
diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp
index 7e130bf07..bcaa762df 100644
--- a/src/replica/replica.cpp
+++ b/src/replica/replica.cpp
@@ -32,6 +32,7 @@
 #include <algorithm>
 #include <functional>
 #include <iosfwd>
+#include <rocksdb/status.h>
 #include <set>
 
 #include "backup/replica_backup_manager.h"
@@ -299,8 +300,19 @@ void replica::on_client_read(dsn::message_ex *request, 
bool ignore_throttling)
 
     uint64_t start_time_ns = dsn_now_ns();
     CHECK(_app, "");
-    // TODO(yingchun): check the return value.
-    _app->on_request(request);
+    auto storage_error = _app->on_request(request);
+    if (dsn_unlikely(storage_error != ERR_OK)) {
+        switch (storage_error) {
+        // TODO(yingchun): Now only kCorruption is dealt, consider to deal 
with more storage
+        //  engine errors.
+        case rocksdb::Status::kCorruption:
+            handle_local_failure(ERR_RDB_CORRUPTION);
+            break;
+        default:
+            LOG_ERROR_PREFIX("client read encountered an unhandled error: {}", 
storage_error);
+        }
+        return;
+    }
 
     // If the corresponding perf counter exist, count the duration of this 
operation.
     // rpc code of request is already checked in message_ex::rpc_code, so it 
will always be legal
diff --git a/src/replica/replication_app_base.h 
b/src/replica/replication_app_base.h
index 772270a5f..e48ef8a5d 100644
--- a/src/replica/replication_app_base.h
+++ b/src/replica/replication_app_base.h
@@ -222,7 +222,7 @@ public:
     //
     virtual replication::decree last_durable_decree() const = 0;
     // The return type is generated by storage engine, e.g. 
rocksdb::Status::Code, 0 always mean OK.
-    virtual int on_request(message_ex *request) = 0;
+    virtual int on_request(message_ex *request) WARN_UNUSED_RESULT = 0;
 
     //
     // Parameters:
diff --git a/src/replica/storage/simple_kv/simple_kv.server.h 
b/src/replica/storage/simple_kv/simple_kv.server.h
index 377d3d336..a3a614a4d 100644
--- a/src/replica/storage/simple_kv/simple_kv.server.h
+++ b/src/replica/storage/simple_kv/simple_kv.server.h
@@ -51,7 +51,11 @@ public:
     simple_kv_service(replica *r) : replication_app_base(r) {}
     virtual ~simple_kv_service() {}
 
-    virtual int on_request(dsn::message_ex *request) override { return 
handle_request(request); }
+    virtual int on_request(dsn::message_ex *request) override 
WARN_UNUSED_RESULT
+    {
+        return handle_request(request);
+    }
+
 protected:
     // all service handlers to be implemented further
     // RPC_SIMPLE_KV_SIMPLE_KV_READ
diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h
index e01c5612f..cece7c82e 100644
--- a/src/replica/test/mock_utils.h
+++ b/src/replica/test/mock_utils.h
@@ -76,7 +76,7 @@ public:
         utils::filesystem::create_file(fmt::format("{}/checkpoint.file", 
checkpoint_dir));
         return ERR_OK;
     }
-    int on_request(message_ex *request) override { return 0; }
+    int on_request(message_ex *request) override WARN_UNUSED_RESULT { return 
0; }
     std::string query_compact_state() const { return ""; };
 
     // we mock the followings
diff --git a/src/server/pegasus_read_service.h 
b/src/server/pegasus_read_service.h
index 163a529d3..645b48412 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -43,7 +43,10 @@ public:
     {
     }
 
-    int on_request(dsn::message_ex *request) override { return 
handle_request(request); }
+    int on_request(dsn::message_ex *request) override WARN_UNUSED_RESULT
+    {
+        return handle_request(request);
+    }
 
 protected:
     // all service handlers to be implemented further
diff --git a/src/server/pegasus_server_impl.cpp 
b/src/server/pegasus_server_impl.cpp
index ae1ce7e17..e6e9d48ed 100644
--- a/src/server/pegasus_server_impl.cpp
+++ b/src/server/pegasus_server_impl.cpp
@@ -114,6 +114,11 @@ DSN_DEFINE_int32(pegasus.server,
                  update_rdb_stat_interval,
                  60,
                  "The interval seconds to update RocksDB statistics, in 
seconds.");
+DSN_DEFINE_int32(pegasus.server,
+                 inject_read_error_for_test,
+                 0,
+                 "Which error code to inject in read path, 0 means no error. 
Only for test.");
+DSN_TAG_VARIABLE(inject_read_error_for_test, FT_MUTABLE);
 
 static std::string chkpt_get_dir_name(int64_t decree)
 {
@@ -329,6 +334,11 @@ void pegasus_server_impl::on_get(get_rpc rpc)
     resp.partition_index = _gpid.get_partition_index();
     resp.server = _primary_address;
 
+    if (dsn_unlikely(FLAGS_inject_read_error_for_test != 
rocksdb::Status::kOk)) {
+        resp.error = FLAGS_inject_read_error_for_test;
+        return;
+    }
+
     if (!_read_size_throttling_controller->available()) {
         rpc.error() = dsn::ERR_BUSY;
         _counter_recent_read_throttling_reject_count->increment();
diff --git a/src/test/function_test/base_api_test/integration_test.cpp 
b/src/test/function_test/base_api_test/integration_test.cpp
index 887921b3b..2ee44c482 100644
--- a/src/test/function_test/base_api_test/integration_test.cpp
+++ b/src/test/function_test/base_api_test/integration_test.cpp
@@ -139,3 +139,65 @@ TEST_F(integration_test, write_corrupt_db)
 
     ASSERT_IN_TIME([&] { ASSERT_EQ(3, get_alive_replica_server_count()); }, 
60);
 }
+
+TEST_F(integration_test, read_corrupt_db)
+{
+    // Make best effort to rebalance the cluster,
+    ASSERT_NO_FATAL_FAILURE(
+        run_cmd_from_project_root("echo 'set_meta_level lively' | ./run.sh 
shell"));
+    // Make sure RS-1 has some primaries of table 'temp'.
+    ASSERT_IN_TIME([&] { ASSERT_GT(get_leader_count("temp", 1), 0); }, 120);
+
+    // Inject a read error kCorruption to RS-1.
+    ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
+        "curl 'localhost:34801/updateConfig?inject_read_error_for_test=2'"));
+
+    std::string skey = "skey";
+    std::string value = "value";
+    for (int i = 0; i < 1000; i++) {
+        std::string hkey = fmt::format("hkey.read_corrupt_db.{}", i);
+        ASSERT_EQ(PERR_OK, client_->set(hkey, skey, value));
+    }
+
+    int ok_count = 0;
+    int corruption_count = 0;
+    for (int i = 0; i < 1000; i++) {
+        std::string hkey = fmt::format("hkey.read_corrupt_db.{}", i);
+        std::string got_value;
+        int ret = PERR_OK;
+        do {
+            ret = client_->get(hkey, skey, got_value);
+            if (ret == PERR_OK) {
+                ASSERT_EQ(value, got_value);
+                ok_count++;
+                break;
+            } else if (ret == PERR_CORRUPTION) {
+                // Suppose there must some primaries on RS-1.
+                corruption_count++;
+                break;
+            } else if (ret == PERR_TIMEOUT) {
+                corruption_count++;
+                // If RS-1 crashed before (encounter a read kCorruption error 
from storage engine),
+                // a new read operation on the primary replica it ever held 
will cause timeout.
+                // Force to fetch the latest route table.
+                client_ =
+                    pegasus_client_factory::get_client(cluster_name_.c_str(), 
app_name_.c_str());
+                ASSERT_TRUE(client_ != nullptr);
+            } else {
+                ASSERT_TRUE(false) << ret;
+            }
+        } while (true);
+    }
+
+    ASSERT_GT(ok_count, 0);
+    ASSERT_GT(corruption_count, 0);
+    std::cout << "ok_count: " << ok_count << ", corruption_count: " << 
corruption_count
+              << std::endl;
+
+    // All replica servers in this cluster are healthy.
+    ASSERT_IN_TIME([&] { ASSERT_EQ(3, get_alive_replica_server_count()); }, 
60);
+
+    // Recover the injected read error for RS-1.
+    ASSERT_NO_FATAL_FAILURE(run_cmd_from_project_root(
+        "curl 'localhost:34801/updateConfig?inject_read_error_for_test=0'"));
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to