This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 976c588f circuit breaker with half open state (#2634)
976c588f is described below
commit 976c588f011e28222cc667b0c3694dc88d684ab6
Author: jiangyt-git <[email protected]>
AuthorDate: Mon Jun 3 16:25:35 2024 +0800
circuit breaker with half open state (#2634)
* circuit breaker with half open state
* add switch for half open state
* add half open ut
* add some description
* record all lats of the half open window
* fix some typo
---------
Co-authored-by: jiangyuting <[email protected]>
---
src/brpc/circuit_breaker.cpp | 32 +++++++++++++++++--
src/brpc/circuit_breaker.h | 2 ++
test/brpc_circuit_breaker_unittest.cpp | 57 ++++++++++++++++++++++++++++++++++
3 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/src/brpc/circuit_breaker.cpp b/src/brpc/circuit_breaker.cpp
index 889fe65f..785ec77a 100644
--- a/src/brpc/circuit_breaker.cpp
+++ b/src/brpc/circuit_breaker.cpp
@@ -21,6 +21,7 @@
#include <gflags/gflags.h>
#include "brpc/errno.pb.h"
+#include "brpc/reloadable_flags.h"
#include "butil/time.h"
namespace brpc {
@@ -45,6 +46,12 @@ DEFINE_int32(circuit_breaker_max_isolation_duration_ms,
30000,
"Maximum isolation duration in milliseconds");
DEFINE_double(circuit_breaker_epsilon_value, 0.02,
"ema_alpha = 1 - std::pow(epsilon, 1.0 / window_size)");
+DEFINE_int32(circuit_breaker_half_open_window_size, 0,
+ "The limited number of requests allowed to pass through by the half-open "
+ "window. Only if all of them are successful, the circuit breaker will "
+ "go to the closed state. Otherwise, it goes back to the open state. "
+ "Values == 0 disables this feature");
+BRPC_VALIDATE_GFLAG(circuit_breaker_half_open_window_size, NonNegativeInteger);
namespace {
// EPSILON is used to generate the smoothing coefficient when calculating EMA.
@@ -132,7 +139,7 @@ bool
CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
if (ema_latency != 0) {
error_cost = std::min(ema_latency * max_mutiple, error_cost);
}
- //Errorous response
+ // Errorous response
if (error_cost != 0) {
int64_t ema_error_cost =
_ema_error_cost.fetch_add(error_cost, butil::memory_order_relaxed);
@@ -142,7 +149,7 @@ bool
CircuitBreaker::EmaErrorRecorder::UpdateErrorCost(int64_t error_cost,
return ema_error_cost <= max_error_cost;
}
- //Ordinary response
+ // Ordinary response
int64_t ema_error_cost = _ema_error_cost.load(butil::memory_order_relaxed);
do {
if (ema_error_cost == 0) {
@@ -171,7 +178,9 @@ CircuitBreaker::CircuitBreaker()
, _last_reset_time_ms(0)
, _isolation_duration_ms(FLAGS_circuit_breaker_min_isolation_duration_ms)
, _isolated_times(0)
- , _broken(false) {
+ , _broken(false)
+ , _half_open(false)
+ , _half_open_success_count(0) {
}
bool CircuitBreaker::OnCallEnd(int error_code, int64_t latency) {
@@ -188,6 +197,19 @@ bool CircuitBreaker::OnCallEnd(int error_code, int64_t
latency) {
if (_broken.load(butil::memory_order_relaxed)) {
return false;
}
+ if (FLAGS_circuit_breaker_half_open_window_size > 0
+ && _half_open.load(butil::memory_order_relaxed)) {
+ if (error_code != 0) {
+ MarkAsBroken();
+ return false;
+ }
+ if (_half_open_success_count.fetch_add(1, butil::memory_order_relaxed)
+ + 1 == FLAGS_circuit_breaker_half_open_window_size) {
+ _half_open.store(false, butil::memory_order_relaxed);
+ _half_open_success_count.store(0, butil::memory_order_relaxed);
+ }
+ }
+
if (_long_window.OnCallEnd(error_code, latency) &&
_short_window.OnCallEnd(error_code, latency)) {
return true;
@@ -201,6 +223,10 @@ void CircuitBreaker::Reset() {
_short_window.Reset();
_last_reset_time_ms = butil::cpuwide_time_ms();
_broken.store(false, butil::memory_order_release);
+ if (FLAGS_circuit_breaker_half_open_window_size > 0) {
+ _half_open.store(true, butil::memory_order_relaxed);
+ _half_open_success_count.store(0, butil::memory_order_relaxed);
+ }
}
void CircuitBreaker::MarkAsBroken() {
diff --git a/src/brpc/circuit_breaker.h b/src/brpc/circuit_breaker.h
index 826e6914..b16a4299 100644
--- a/src/brpc/circuit_breaker.h
+++ b/src/brpc/circuit_breaker.h
@@ -87,6 +87,8 @@ private:
butil::atomic<int> _isolation_duration_ms;
butil::atomic<int> _isolated_times;
butil::atomic<bool> _broken;
+ butil::atomic<bool> _half_open;
+ butil::atomic<int32_t> _half_open_success_count;
};
} // namespace brpc
diff --git a/test/brpc_circuit_breaker_unittest.cpp
b/test/brpc_circuit_breaker_unittest.cpp
index ef09cd94..e8f55153 100644
--- a/test/brpc_circuit_breaker_unittest.cpp
+++ b/test/brpc_circuit_breaker_unittest.cpp
@@ -45,6 +45,7 @@ const int kErrorCodeForSucc = 0;
const int kErrorCost = 1000;
const int kLatency = 1000;
const int kThreadNum = 3;
+const int kHalfWindowSize = 0;
} // namespace
namespace brpc {
@@ -54,6 +55,7 @@ DECLARE_int32(circuit_breaker_short_window_error_percent);
DECLARE_int32(circuit_breaker_long_window_error_percent);
DECLARE_int32(circuit_breaker_min_isolation_duration_ms);
DECLARE_int32(circuit_breaker_max_isolation_duration_ms);
+DECLARE_int32(circuit_breaker_half_open_window_size);
} // namespace brpc
int main(int argc, char* argv[]) {
@@ -63,6 +65,7 @@ int main(int argc, char* argv[]) {
brpc::FLAGS_circuit_breaker_long_window_error_percent =
kLongWindowErrorPercent;
brpc::FLAGS_circuit_breaker_min_isolation_duration_ms =
kMinIsolationDurationMs;
brpc::FLAGS_circuit_breaker_max_isolation_duration_ms =
kMaxIsolationDurationMs;
+ brpc::FLAGS_circuit_breaker_half_open_window_size = kHalfWindowSize;
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
return RUN_ALL_TESTS();
@@ -160,6 +163,60 @@ TEST_F(CircuitBreakerTest, should_isolate) {
}
}
+TEST_F(CircuitBreakerTest, should_isolate_with_half_open) {
+ std::vector<pthread_t> thread_list;
+ std::vector<std::unique_ptr<FeedbackControl>> fc_list;
+ StartFeedbackThread(&thread_list, &fc_list, 100);
+ int total_failed = 0;
+ for (int i = 0; i < kThreadNum; ++i) {
+ void* ret_data = nullptr;
+ ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+ FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+ EXPECT_GT(fc->_unhealthy_cnt, 0);
+ EXPECT_FALSE(fc->_healthy);
+ total_failed += fc->_unhealthy_cnt;
+ }
+ _circuit_breaker.Reset();
+
+ int total_failed1 = 0;
+ StartFeedbackThread(&thread_list, &fc_list, 100);
+ for (int i = 0; i < kThreadNum; ++i) {
+ void* ret_data = nullptr;
+ ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+ FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+ EXPECT_FALSE(fc->_healthy);
+ EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
+ EXPECT_GT(fc->_unhealthy_cnt, 0);
+ total_failed1 += fc->_unhealthy_cnt;
+ }
+
+ // Enable the half-open state.
+ // The first request cause _broken = true immediately.
+ brpc::FLAGS_circuit_breaker_half_open_window_size = 10;
+ _circuit_breaker.Reset();
+ int total_failed2 = 0;
+ StartFeedbackThread(&thread_list, &fc_list, 100);
+ for (int i = 0; i < kThreadNum; ++i) {
+ void* ret_data = nullptr;
+ ASSERT_EQ(pthread_join(thread_list[i], &ret_data), 0);
+ FeedbackControl* fc = static_cast<FeedbackControl*>(ret_data);
+ EXPECT_FALSE(fc->_healthy);
+ EXPECT_LE(fc->_healthy_cnt, kShortWindowSize);
+ EXPECT_GT(fc->_unhealthy_cnt, 0);
+ total_failed2 += fc->_unhealthy_cnt;
+ }
+ brpc::FLAGS_circuit_breaker_half_open_window_size = 0;
+
+ EXPECT_EQ(kLongWindowSize * 2 * kThreadNum -
+ kShortWindowSize *
+ brpc::FLAGS_circuit_breaker_short_window_error_percent /
+ 100,
+ total_failed);
+
+ EXPECT_EQ(total_failed1, total_failed);
+ EXPECT_EQ(kLongWindowSize * 2 * kThreadNum, total_failed2);
+}
+
TEST_F(CircuitBreakerTest, isolation_duration_grow_and_reset) {
std::vector<pthread_t> thread_list;
std::vector<std::unique_ptr<FeedbackControl>> fc_list;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]