This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c26e64ac2f2 [chore](cloud) Implement idempotent injection framework
for meta-service (#51905)
c26e64ac2f2 is described below
commit c26e64ac2f2eaa6e8455e5b3a200b799645e0b75
Author: Gavin Chou <[email protected]>
AuthorDate: Sat Jun 21 16:37:52 2025 +0800
[chore](cloud) Implement idempotent injection framework for meta-service
(#51905)
This commit support to inject idempotent requests by replaying request
with random delay
the following configs are added
```
CONF_mBool(enable_idempotent_request_injection, "false");
// idempotent_request_replay_delay_ms =
idempotent_request_replay_delay_base_ms +
random(-idempotent_request_replay_delay_range_ms,
idempotent_request_replay_delay_range_ms)
CONF_mInt64(idempotent_request_replay_delay_base_ms, "10000");
CONF_mInt64(idempotent_request_replay_delay_range_ms, "5000");
// exclude some request that are meaningless to replay, comma separated
list. e.g. GetTabletStatsRequest,GetVersionRequest
CONF_mString(idempotent_request_replay_exclusion,
"GetTabletStatsRequest,GetVersionRequest");
```
to enable request replay
```
enable_idempotent_request_injection=true
```
to set random delay in a range
[idempotent_request_replay_delay_base_ms -
idempotent_request_replay_delay_range_ms,
idempotent_request_replay_delay_base_ms +
idempotent_request_replay_delay_range_ms] the following example means
the request will be replayed with a delay ranges from 5000ms to 15000ms
```
idempotent_request_replay_delay_base_ms=10000
idempotent_request_replay_delay_range_ms=5000
```
if the randomly chosen delay is negative the request will be not
replayed (idempotent_request_replay_delay_range_ms can be set greater
than idempotent_request_replay_delay_base_ms)
to exclude some requests (PB message names), the following examples
means GetTabletStatsRequest and GetVersionRequest will not be replayed
```
idempotent_request_replay_exclusion=GetTabletStatsRequest,GetVersionRequest
```
---
cloud/src/common/config.h | 8 ++++
cloud/src/meta-service/meta_service.h | 52 +++++++++++++++++++++++-
cloud/src/meta-service/meta_service_resource.cpp | 7 ----
3 files changed, 59 insertions(+), 8 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 112fcf05be6..48fed8b1c33 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -35,6 +35,14 @@ CONF_Bool(use_mem_kv, "false");
CONF_Int32(meta_server_register_interval_ms, "20000");
CONF_Int32(meta_server_lease_ms, "60000");
+// for chaos testing
+CONF_mBool(enable_idempotent_request_injection, "false");
+// idempotent_request_replay_delay_ms =
idempotent_request_replay_delay_base_ms +
random(-idempotent_request_replay_delay_range_ms,
idempotent_request_replay_delay_range_ms)
+CONF_mInt64(idempotent_request_replay_delay_base_ms, "10000");
+CONF_mInt64(idempotent_request_replay_delay_range_ms, "5000");
+// exclude some request that are meaningless to replay, comma separated list.
e.g. GetTabletStatsRequest,GetVersionRequest
+CONF_mString(idempotent_request_replay_exclusion,
"GetTabletStatsRequest,GetVersionRequest");
+
CONF_Int64(fdb_txn_timeout_ms, "10000");
CONF_Int64(brpc_max_body_size, "3147483648");
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
diff --git a/cloud/src/meta-service/meta_service.h
b/cloud/src/meta-service/meta_service.h
index e28872917b5..7d22a14ad03 100644
--- a/cloud/src/meta-service/meta_service.h
+++ b/cloud/src/meta-service/meta_service.h
@@ -48,6 +48,14 @@ void internal_get_rowset(Transaction* txn, int64_t start,
int64_t end,
const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
std::string& msg, GetRowsetResponse* response);
+// for wrapping stateful lambda to run in bthread
+static void* run_bthread_work(void* arg) {
+ auto f = reinterpret_cast<std::function<void()>*>(arg);
+ (*f)();
+ delete f;
+ return nullptr;
+}
+
class MetaServiceImpl : public cloud::MetaService {
public:
MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv,
std::shared_ptr<ResourceManager> resource_mgr,
@@ -763,8 +771,12 @@ private:
static_assert(std::is_base_of_v<::google::protobuf::Message,
Response>);
using namespace std::chrono;
-
brpc::ClosureGuard done_guard(done);
+
+ // life span of this defer MUST be longer than `done`
+ std::unique_ptr<int, std::function<void(int*)>> defer_injection(
+ (int*)(0x01), [&, this](int*) { idempotent_injection(method,
req, resp); });
+
if (!config::enable_txn_store_retry) {
(impl_.get()->*method)(ctrl, req, resp, brpc::DoNothing());
if (DCHECK_IS_ON()) {
@@ -834,6 +846,44 @@ private:
}
}
+ template <typename Request, typename Response>
+ void idempotent_injection(MetaServiceMethod<Request, Response> method,
const Request* requ,
+ Response* resp) {
+ if (!config::enable_idempotent_request_injection) return;
+
+ using namespace std::chrono;
+ auto s = system_clock::now();
+ static std::mt19937_64
rng(duration_cast<milliseconds>(s.time_since_epoch()).count());
+ // clang-format off
+ // FIXME(gavin): make idempotent_request_replay_exclusion configurable
via HTTP
+ static auto exclusion = []{ std::istringstream
iss(config::idempotent_request_replay_exclusion); std::string e;
std::unordered_set<std::string> r;
+ while (std::getline(iss, e, ',')) { r.insert(e); } return r;
+ }();
+ auto f = new std::function<void()>([s, req = *requ, res = *resp,
method, this]() mutable { // copy and capture
+ auto dist =
std::uniform_int_distribution(-config::idempotent_request_replay_delay_range_ms,
+
config::idempotent_request_replay_delay_range_ms);
+ int64_t sleep_ms = config::idempotent_request_replay_delay_base_ms
+ dist(rng);
+ LOG(INFO) << " request_name=" << req.GetDescriptor()->name()
+ << " response_name=" << res.GetDescriptor()->name()
+ << " queue_ts=" <<
duration_cast<milliseconds>(s.time_since_epoch()).count()
+ << " now_ts=" <<
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()
+ << " idempotent_request_replay_delay_base_ms=" <<
config::idempotent_request_replay_delay_base_ms
+ << " idempotent_request_replay_delay_range_ms=" <<
config::idempotent_request_replay_delay_range_ms
+ << " idempotent_request_replay_delay_ms=" << sleep_ms
+ << " request=" << req.ShortDebugString();
+ if (sleep_ms < 0 || exclusion.count(req.GetDescriptor()->name()))
return;
+ brpc::Controller ctrl;
+ bthread_usleep(sleep_ms * 1000);
+ (impl_.get()->*method)(&ctrl, &req, &res, brpc::DoNothing());
+ });
+ // clang-format on
+ bthread_t bid;
+ if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0)
{
+ LOG(WARNING) << "failed to bthread_start_background, run in
current thread";
+ run_bthread_work(f);
+ }
+ }
+
std::unique_ptr<MetaServiceImpl> impl_;
};
diff --git a/cloud/src/meta-service/meta_service_resource.cpp
b/cloud/src/meta-service/meta_service_resource.cpp
index bbd94b577b1..8c8b0646c94 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -51,13 +51,6 @@ bool is_valid_storage_vault_name(const std::string& str) {
namespace doris::cloud {
-static void* run_bthread_work(void* arg) {
- auto f = reinterpret_cast<std::function<void()>*>(arg);
- (*f)();
- delete f;
- return nullptr;
-}
-
static std::string_view print_cluster_status(const ClusterStatus& status) {
switch (status) {
case ClusterStatus::UNKNOWN:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]