This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new d0b71ffc fix rpc_replay can't send request equably (#1910)
d0b71ffc is described below
commit d0b71ffc34b73cbec39cd9773b01137f85b4ffde
Author: bumingchun <[email protected]>
AuthorDate: Mon Aug 29 10:27:22 2022 +0800
fix rpc_replay can't send request equably (#1910)
* fix rpc_replay can't send request equably
* 类型修改
---
tools/rpc_replay/rpc_replay.cpp | 42 ++++++++++++++++++++---------------------
1 file changed, 20 insertions(+), 22 deletions(-)
diff --git a/tools/rpc_replay/rpc_replay.cpp b/tools/rpc_replay/rpc_replay.cpp
index d022e3cb..a08fbefa 100644
--- a/tools/rpc_replay/rpc_replay.cpp
+++ b/tools/rpc_replay/rpc_replay.cpp
@@ -135,14 +135,10 @@ static void* replay_thread(void* arg) {
double req_rate = FLAGS_qps / (double)FLAGS_thread_num;
brpc::SerializedRequest req;
brpc::NsheadMessage nshead_req;
- std::deque<int64_t> timeq;
- size_t MAX_QUEUE_SIZE = (size_t)req_rate;
- if (MAX_QUEUE_SIZE < 100) {
- MAX_QUEUE_SIZE = 100;
- } else if (MAX_QUEUE_SIZE > 2000) {
- MAX_QUEUE_SIZE = 2000;
- }
- timeq.push_back(butil::gettimeofday_us());
+ int64_t last_expected_time = butil::monotonic_time_ns();
+ const int64_t interval = (int64_t) (1000000000L / req_rate);
+ // the max tolerant delay between end_time and expected_time. 10ms or 10
intervals
+ int64_t max_tolerant_delay = std::max((int64_t) 10000000L, 10 * interval);
for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) {
brpc::SampleIterator it(FLAGS_dir);
int j = 0;
@@ -199,21 +195,15 @@ static void* replay_thread(void* arg) {
brpc::NewCallback(handle_response, cntl, start_time,
false);
chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
cntl, req_ptr, NULL/*ignore response*/, done);
- const int64_t end_time = butil::gettimeofday_us();
- int64_t expected_elp = 0;
- int64_t actual_elp = 0;
- timeq.push_back(end_time);
- if (timeq.size() > MAX_QUEUE_SIZE) {
- actual_elp = end_time - timeq.front();
- timeq.pop_front();
- expected_elp = (size_t)(1000000 * timeq.size() / req_rate);
- } else {
- actual_elp = end_time - timeq.front();
- expected_elp = (size_t)(1000000 * (timeq.size() - 1) /
req_rate);
- }
- if (actual_elp < expected_elp) {
- bthread_usleep(expected_elp - actual_elp);
+ int64_t end_time = butil::monotonic_time_ns();
+ int64_t expected_time = last_expected_time + interval;
+ if (end_time < expected_time) {
+ usleep((expected_time - end_time)/1000);
}
+ if (end_time - expected_time > max_tolerant_delay) {
+ expected_time = end_time;
+ }
+ last_expected_time = expected_time;
}
}
}
@@ -254,6 +244,14 @@ int main(int argc, char* argv[]) {
}
}
+ const int rate_limit_per_thread = 1000000;
+ int req_rate_per_thread = FLAGS_qps / FLAGS_thread_num;
+ if (req_rate_per_thread > rate_limit_per_thread) {
+ LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is
too large in one thread. The rate limit is "
+ << rate_limit_per_thread << " in one thread";
+ return false;
+ }
+
std::vector<bthread_t> bids;
std::vector<pthread_t> pids;
if (!FLAGS_use_bthread) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]