liuxuting commented on issue #2944:
URL: https://github.com/apache/brpc/issues/2944#issuecomment-2862450698
client压测脚本:
#include <stdlib.h>
#include <unistd.h>
#include <vector>
#include <gflags/gflags.h>
#include "butil/atomicops.h"
#include "butil/fast_rand.h"
#include "butil/logging.h"
#include "brpc/rdma/rdma_helper.h"
#include "brpc/server.h"
#include "brpc/channel.h"
#include "bthread/bthread.h"
#include "bvar/latency_recorder.h"
#include "bvar/variable.h"
#include "test.pb.h"
#include <thread>
#include "butil/time.h"
#ifdef BRPC_WITH_RDMA
DEFINE_string(protocol, "baidu_std", "Protocol type.");
DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers");
DEFINE_string(tcp_servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of
servers");
DEFINE_int32(rpc_timeout_ms, 2000, "RPC call timeout");
DEFINE_int32(port, 8002, "TCP Port of this server");
std::vector<std::string> g_servers;
std::vector<std::string> g_tcp_servers;
int rr_index = 0;
class PerformanceTest {
public:
PerformanceTest(int attachment_size, bool echo_attachment)
: _addr(NULL)
, _channel(NULL)
, _start_time(0)
, _iterations(0)
, _stop(false)
{
if (attachment_size > 0) {
_addr = malloc(attachment_size);
butil::fast_rand_bytes(_addr, attachment_size);
_attachment.append(_addr, attachment_size);
}
_echo_attachment = echo_attachment;
}
~PerformanceTest() {
if (_addr) {
free(_addr);
}
delete _channel;
}
inline bool IsStop() { return _stop; }
int Init(bool use_rdma) {
brpc::ChannelOptions options;
options.use_rdma = use_rdma;
options.protocol = FLAGS_protocol;
options.timeout_ms = FLAGS_rpc_timeout_ms;
options.max_retry = 1000;
std::string server;
if (use_rdma) {
server = g_servers[(rr_index++) % g_servers.size()];
} else {
server = g_tcp_servers[(rr_index++) % g_tcp_servers.size()];
}
_channel = new brpc::Channel();
if (_channel->Init(server.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
exit(1);
}
brpc::Controller cntl;
test::PerfTestResponse response;
test::PerfTestRequest request;
request.set_echo_attachment(_echo_attachment);
test::PerfTestService_Stub stub(_channel);
stub.Test(&cntl, &request, &response, NULL);
if (cntl.Failed()) {
LOG(ERROR) << "RPC call failed: " << cntl.ErrorText();
//return -1;
}
return 0;
}
struct RespClosure {
brpc::Controller* cntl;
test::PerfTestResponse* resp;
PerformanceTest* test;
};
void SendRequest() {
RespClosure* closure = new RespClosure;
test::PerfTestRequest request;
closure->resp = new test::PerfTestResponse();
closure->cntl = new brpc::Controller();
request.set_echo_attachment(_echo_attachment);
closure->cntl->request_attachment().append(_attachment);
closure->test = this;
google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse,
closure);
test::PerfTestService_Stub stub(_channel);
stub.Test(closure->cntl, &request, closure->resp, done);
}
static void HandleResponse(RespClosure* closure) {
std::unique_ptr<brpc::Controller> cntl_guard(closure->cntl);
std::unique_ptr<test::PerfTestResponse>
response_guard(closure->resp);
if (closure->cntl->Failed()) {
LOG(ERROR) << "RPC call failed: " << closure->cntl->ErrorText();
closure->test->_stop = true;
return;
}
cntl_guard.reset(NULL);
response_guard.reset(NULL);
closure->test->SendRequest();
}
static void* RunTest(void* arg) {
PerformanceTest* test = (PerformanceTest*)arg;
test->SendRequest();
return NULL;
}
private:
void* _addr;
brpc::Channel* _channel;
uint64_t _start_time;
uint32_t _iterations;
volatile bool _stop;
butil::IOBuf _attachment;
bool _echo_attachment;
};
static void* DeleteTest(void* arg) {
PerformanceTest* test = (PerformanceTest*)arg;
delete test;
return NULL;
}
void Test(int thread_num, int attachment_size, bool use_rdma) {
std::vector<PerformanceTest*> tests;
for (int k = 0; k < thread_num; ++k) {
PerformanceTest* t = new PerformanceTest(attachment_size, true);
if (t->Init(use_rdma) < 0) {
exit(1);
}
tests.push_back(t);
}
uint64_t start_time = butil::gettimeofday_us();
bthread_t tid[thread_num];
for (int k = 0; k < thread_num; ++k) {
bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL,
PerformanceTest::RunTest, tests[k]);
}
for (int k = 0; k < thread_num; ++k) {
while (!tests[k]->IsStop()) {
bthread_usleep(10000);
}
}
uint64_t end_time = butil::gettimeofday_us();
for (int k = 0; k < thread_num; ++k) {
bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL, DeleteTest,
tests[k]);
}
}
int client(int argc, char* argv[], bool use_rdma) {
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
// Initialize RDMA environment in advance.
if (use_rdma) {
std::string::size_type pos1 = 0;
std::string::size_type pos2 = FLAGS_servers.find('+');
while (pos2 != std::string::npos) {
pos1 = pos2 + 1;
pos2 = FLAGS_servers.find('+', pos1);
}
g_servers.push_back(FLAGS_servers.substr(pos1));
} else {
std::string::size_type pos1 = 0;
std::string::size_type pos2 = FLAGS_tcp_servers.find('+');
while (pos2 != std::string::npos) {
pos1 = pos2 + 1;
pos2 = FLAGS_tcp_servers.find('+', pos1);
}
g_tcp_servers.push_back(FLAGS_tcp_servers.substr(pos1));
}
for (int j = 1; j <= 1024; j *= 4) {
for (int i = 1; i <= 16; i *= 2) {
Test(i, j, use_rdma);
}
}
return 0;
}
namespace test {
class PerfTestServiceImpl : public PerfTestService {
public:
PerfTestServiceImpl() {}
~PerfTestServiceImpl() {}
void Test(google::protobuf::RpcController* cntl_base,
const PerfTestRequest* request,
PerfTestResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
if (request->echo_attachment()) {
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
cntl->response_attachment().append(cntl->request_attachment());
}
}
};
}
int main(int argc, char* argv[]) {
std::thread client_thread([&](){
client(argc, argv, true);
});
std::cout << "=====start client=====" << std::endl;
//std::this_thread::sleep_for(std::chrono::milliseconds(10000));
client_thread.detach();
std::thread client2_thread([&](){
client(argc, argv, false);
});
client2_thread.join();
}
#else
int main(int argc, char* argv[]) {
LOG(ERROR) << " brpc is not compiled with rdma. To enable it, please
refer to https://github.com/apache/brpc/blob/master/docs/en/rdma.md";
return 0;
}
#endif
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]