This is an automated email from the ASF dual-hosted git repository.
lorinlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d866be27 Fix RetryPolicy blocks HealthCheck
new 92450981 Merge pull request #2419 from wwbmmm/fix-check-health
d866be27 is described below
commit d866be27ed607eb6e6b5350af4c01f28b69bce01
Author: wwbmmm <[email protected]>
AuthorDate: Thu Oct 19 20:04:49 2023 +0800
Fix RetryPolicy blocks HealthCheck
---
src/brpc/controller.cpp | 38 +++++++++++++++++++++++++++-----------
1 file changed, 27 insertions(+), 11 deletions(-)
diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp
index 25a5940e..5392f16c 100644
--- a/src/brpc/controller.cpp
+++ b/src/brpc/controller.cpp
@@ -662,18 +662,17 @@ void Controller::OnVersionedRPCReturned(const
CompletionInfo& info,
// Retry backoff.
bthread::TaskGroup* g = bthread::tls_task_group;
- if (retry_policy->CanRetryBackoffInPthread() ||
- (g && !g->is_current_pthread_task())) {
- int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this)
* 1000L;
+ int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) *
1000L;
+ if (backoff_time_us > 0 &&
+ backoff_time_us < _deadline_us - butil::gettimeofday_us()) {
// No need to do retry backoff when the backoff time is longer
than the remaining rpc time.
- if (backoff_time_us > 0 &&
- backoff_time_us < _deadline_us - butil::gettimeofday_us())
{
+ if (retry_policy->CanRetryBackoffInPthread() ||
+ (g && !g->is_current_pthread_task())) {
bthread_usleep(backoff_time_us);
+ } else {
+ LOG(WARNING) << "`CanRetryBackoffInPthread()' returns
false, "
+ "skip retry backoff in pthread.";
}
-
- } else {
- LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, "
- "skip retry backoff in pthread.";
}
return IssueRPC(butil::gettimeofday_us());
}
@@ -1262,8 +1261,25 @@ int Controller::HandleSocketFailed(bthread_id_t id,
void* data, int error_code,
cntl->SetFailed(error_code, "%s @%s", berror(error_code),
butil::endpoint2str(cntl->remote_side()).c_str());
}
- CompletionInfo info = { id, false };
- cntl->OnVersionedRPCReturned(info, true, saved_error);
+
+ struct OnVersionedRPCReturnedArgs {
+ bthread_id_t id;
+ Controller* cntl;
+ int error;
+ };
+ auto func = [](void* p) -> void* {
+ std::unique_ptr<OnVersionedRPCReturnedArgs>
args(static_cast<OnVersionedRPCReturnedArgs*>(p));
+ CompletionInfo info = { args->id, false };
+ args->cntl->OnVersionedRPCReturned(info, true, args->error);
+ return NULL;
+ };
+
+ auto* args = new OnVersionedRPCReturnedArgs{ id, cntl, saved_error };
+ bthread_t tid;
+ // RetryPolicy may block current bthread, so start a new bthread to run
OnVersionedRPCReturned
+ if (!cntl->_retry_policy || bthread_start_background(&tid, NULL, func,
args) != 0) {
+ func(args);
+ }
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]