trevor211 commented on code in PR #1836: URL: https://github.com/apache/incubator-brpc/pull/1836#discussion_r974982087
########## example/rdma_performance/client.cpp: ########## @@ -0,0 +1,309 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <stdlib.h> +#include <unistd.h> +#include <vector> +#include <gflags/gflags.h> +#include "butil/atomicops.h" +#include "butil/fast_rand.h" +#include "butil/logging.h" +#include "brpc/rdma/rdma_helper.h" +#include "brpc/server.h" +#include "brpc/channel.h" +#include "bthread/bthread.h" +#include "bvar/latency_recorder.h" +#include "bvar/variable.h" +#include "test.pb.h" + +DEFINE_int32(thread_num, 0, "How many threads are used"); +DEFINE_int32(queue_depth, 1, "How many requests can be pending in the queue"); +DEFINE_int32(expected_qps, 0, "The expected QPS"); +DEFINE_int32(max_thread_num, 16, "The max number of threads are used"); +DEFINE_int32(attachment_size, -1, "Attachment size is used (in Bytes)"); +DEFINE_bool(echo_attachment, false, "Select whether attachment should be echo"); +DEFINE_string(connection_type, "single", "Connection type of the channel"); +DEFINE_string(protocol, "baidu_std", "Protocol type."); +DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers"); +DEFINE_bool(use_rdma, true, "Use RDMA or not"); +DEFINE_int32(rpc_timeout_ms, 2000, "RPC call timeout"); +DEFINE_int32(test_seconds, 20, "Test running time"); +DEFINE_int32(test_iterations, 0, "Test iterations"); + +bvar::LatencyRecorder g_latency_recorder("client"); +bvar::LatencyRecorder g_server_cpu_recorder("server_cpu"); +bvar::LatencyRecorder g_client_cpu_recorder("client_cpu"); +butil::atomic<uint64_t> g_last_time(0); +butil::atomic<uint64_t> g_total_bytes; +butil::atomic<uint64_t> g_total_cnt; +std::vector<std::string> g_servers; +int rr_index = 0; +volatile bool g_stop = false; + +butil::atomic<int64_t> g_token(10000); + +static void* GenerateToken(void* arg) { + int64_t start_time = butil::monotonic_time_ns(); + int64_t accumulative_token = g_token.load(butil::memory_order_relaxed); + while (!g_stop) { + bthread_usleep(100000); + int64_t now = butil::monotonic_time_ns(); + if (accumulative_token * 1000000000 / (now - start_time) < FLAGS_expected_qps) { + int64_t delta = FLAGS_expected_qps * (now - start_time) / 1000000000 - accumulative_token; + g_token.fetch_add(delta, butil::memory_order_relaxed); + accumulative_token += delta; + } + } + return NULL; +} + +class PerformanceTest { +public: + PerformanceTest(int attachment_size, bool echo_attachment) + : _addr(NULL) + , _channel(NULL) + , _start_time(0) + , _iterations(0) + , _stop(false) + { + if (attachment_size > 0) { + _addr = malloc(attachment_size); + butil::fast_rand_bytes(_addr, attachment_size); + _attachment.append(_addr, attachment_size); + } + _echo_attachment = echo_attachment; + } + + ~PerformanceTest() { + if (_addr) { + free(_addr); + } + delete _channel; + } + + inline bool IsStop() { return _stop; } + + int Init() { + brpc::ChannelOptions options; + options.use_rdma = FLAGS_use_rdma; + options.protocol = FLAGS_protocol; + options.connection_type = FLAGS_connection_type; + options.timeout_ms = FLAGS_rpc_timeout_ms; + options.max_retry = 0; + std::string server = g_servers[(rr_index++) % g_servers.size()]; + _channel = new brpc::Channel(); + if (_channel->Init(server.c_str(), &options) != 0) { + LOG(ERROR) << "Fail to initialize channel"; + return -1; + } + brpc::Controller cntl; + test::PerfTestResponse response; + test::PerfTestRequest request; + request.set_echo_attachment(_echo_attachment); + test::PerfTestService_Stub stub(_channel); + stub.Test(&cntl, &request, &response, NULL); + if (cntl.Failed()) { + LOG(ERROR) << "RPC call failed: " << cntl.ErrorText(); + return -1; + } + return 0; + } + + struct RespClosure { + brpc::Controller* cntl; + test::PerfTestResponse* resp; + PerformanceTest* test; + }; + + void SendRequest() { + if (FLAGS_expected_qps > 0) { + while (g_token.load(butil::memory_order_relaxed) <= 0) { + bthread_usleep(10); + } + g_token.fetch_sub(1, butil::memory_order_relaxed); + } + RespClosure* closure = new RespClosure; + test::PerfTestRequest request; + closure->resp = new test::PerfTestResponse(); + closure->cntl = new brpc::Controller(); + request.set_echo_attachment(_echo_attachment); + closure->cntl->request_attachment().append(_attachment); + closure->test = this; + google::protobuf::Closure* done = brpc::NewCallback(&HandleResponse, closure); + test::PerfTestService_Stub stub(_channel); + stub.Test(closure->cntl, &request, closure->resp, done); + } + + static void HandleResponse(RespClosure* closure) { + std::unique_ptr<brpc::Controller> cntl_guard(closure->cntl); + std::unique_ptr<test::PerfTestResponse> response_guard(closure->resp); + if (closure->cntl->Failed()) { + LOG(ERROR) << "RPC call failed: " << closure->cntl->ErrorText(); + closure->test->_stop = true; + return; + } + + g_latency_recorder << closure->cntl->latency_us(); + if (closure->resp->cpu_usage().size() > 0) { + g_server_cpu_recorder << atof(closure->resp->cpu_usage().c_str()) * 100; + } + g_total_bytes.fetch_add(closure->cntl->request_attachment().size(), butil::memory_order_relaxed); + g_total_cnt.fetch_add(1, butil::memory_order_relaxed); + + cntl_guard.reset(NULL); + response_guard.reset(NULL); + + if (closure->test->_iterations == 0 && FLAGS_test_iterations > 0) { + closure->test->_stop = true; + return; + } + --closure->test->_iterations; + uint64_t last = g_last_time.load(butil::memory_order_relaxed); + uint64_t now = butil::gettimeofday_us(); + if (now > last && now - last > 100000) { + if (g_last_time.exchange(now, butil::memory_order_relaxed) == last) { + g_client_cpu_recorder << + atof(bvar::Variable::describe_exposed("process_cpu_usage").c_str()) * 100; + } + } + if (now - closure->test->_start_time > FLAGS_test_seconds * 1000000u) { + closure->test->_stop = true; + return; + } + closure->test->SendRequest(); + } + + static void* RunTest(void* arg) { + PerformanceTest* test = (PerformanceTest*)arg; + test->_start_time = butil::gettimeofday_us(); + test->_iterations = FLAGS_test_iterations; + + for (int i = 0; i < FLAGS_queue_depth; ++i) { + test->SendRequest(); + } + + return NULL; + } + +private: + void* _addr; + brpc::Channel* _channel; + uint64_t _start_time; + uint32_t _iterations; + volatile bool _stop; + butil::IOBuf _attachment; + bool _echo_attachment; +}; + +static void* DeleteTest(void* arg) { + PerformanceTest* test = (PerformanceTest*)arg; + delete test; + return NULL; +} + +void Test(int thread_num, int attachment_size) { + std::cout << "[Threads: " << thread_num + << ", Depth: " << FLAGS_queue_depth + << ", Attachment: " << attachment_size << "B" + << ", RDMA: " << (FLAGS_use_rdma ? "yes" : "no") + << ", Echo: " << (FLAGS_echo_attachment ? "yes]" : "no]") + << std::endl; + g_total_bytes.store(0, butil::memory_order_relaxed); + g_total_cnt.store(0, butil::memory_order_relaxed); + std::vector<PerformanceTest*> tests; + for (int k = 0; k < thread_num; ++k) { + PerformanceTest* t = new PerformanceTest(attachment_size, FLAGS_echo_attachment); + if (t->Init() < 0) { + exit(1); + } + tests.push_back(t); + } + uint64_t start_time = butil::gettimeofday_us(); + bthread_t tid[thread_num]; + if (FLAGS_expected_qps > 0) { + bthread_t tid; + bthread_start_background(&tid, &BTHREAD_ATTR_NORMAL, GenerateToken, NULL); + } + for (int k = 0; k < thread_num; ++k) { + bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL, + PerformanceTest::RunTest, tests[k]); + } + for (int k = 0; k < thread_num; ++k) { + while (!tests[k]->IsStop()) { + bthread_usleep(10000); + } + } + uint64_t end_time = butil::gettimeofday_us(); + double throughput = g_total_bytes / 1.048576 / (end_time - start_time); + if (FLAGS_test_iterations == 0) { + std::cout << "Avg-Latency: " << g_latency_recorder.latency(10) + << ", 90th-Latency: " << g_latency_recorder.latency_percentile(0.9) + << ", 99th-Latency: " << g_latency_recorder.latency_percentile(0.99) + << ", 99.9th-Latency: " << g_latency_recorder.latency_percentile(0.999) + << ", Throughput: " << throughput << "MB/s" + << ", QPS: " << (g_total_cnt.load(butil::memory_order_relaxed) * 1000 / (end_time - start_time)) << "k" + << ", Server CPU-utilization: " << g_server_cpu_recorder.latency(10) << "\%" + << ", Client CPU-utilization: " << g_client_cpu_recorder.latency(10) << "\%" + << std::endl; + } else { + std::cout << " Throughput: " << throughput << "MB/s" << std::endl; + } + g_stop = true; + for (int k = 0; k < thread_num; ++k) { + bthread_start_background(&tid[k], &BTHREAD_ATTR_NORMAL, DeleteTest, tests[k]); + } +} + +int main(int argc, char* argv[]) { + GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); + + // Initialize RDMA environment in advance. + if (FLAGS_use_rdma) { + brpc::rdma::GlobalRdmaInitializeOrDie(); + } + + brpc::StartDummyServerAt(8001); Review Comment: Maybe we can make dummy port configurable by adding a gflag say `dummy_port`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
