This is an automated email from the ASF dual-hosted git repository. jamesge pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push: new 0bc9eaa replace example/partition_echo_c++/server.cpp with that in dynamic_partition_echo_c++ new 1430306 Merge pull request #792 from zyearn/optimize_server_in_partition_channel_ex 0bc9eaa is described below commit 0bc9eaa36383021eb5da73d572600ade881d8817 Author: zhujiashun <zhujiashun2...@gmail.com> AuthorDate: Fri May 31 15:31:36 2019 +0800 replace example/partition_echo_c++/server.cpp with that in dynamic_partition_echo_c++ --- example/partition_echo_c++/server.cpp | 139 +++++++++++++++++++++++++++------ example/partition_echo_c++/server_list | 11 ++- 2 files changed, 120 insertions(+), 30 deletions(-) diff --git a/example/partition_echo_c++/server.cpp b/example/partition_echo_c++/server.cpp index bd42714..2e0b7b1 100644 --- a/example/partition_echo_c++/server.cpp +++ b/example/partition_echo_c++/server.cpp @@ -14,9 +14,13 @@ // A server to receive EchoRequest and send back EchoResponse. +#include <vector> #include <gflags/gflags.h> +#include <butil/time.h> #include <butil/logging.h> #include <butil/string_printf.h> +#include <butil/string_splitter.h> +#include <butil/rand_util.h> #include <brpc/server.h> #include "echo.pb.h" @@ -27,57 +31,144 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state " "(waiting for client to close connection before server stops)"); DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel"); +DEFINE_int32(server_num, 1, "Number of servers"); +DEFINE_string(sleep_us, "", "Sleep so many microseconds before responding"); +DEFINE_bool(spin, false, "spin rather than sleep"); +DEFINE_double(exception_ratio, 0.1, "Percentage of irregular latencies"); +DEFINE_double(min_ratio, 0.2, "min_sleep / sleep_us"); +DEFINE_double(max_ratio, 10, "max_sleep / sleep_us"); // Your implementation of example::EchoService class EchoServiceImpl : public example::EchoService { public: - EchoServiceImpl() {} - ~EchoServiceImpl() {}; - void Echo(google::protobuf::RpcController* cntl_base, - const example::EchoRequest* request, - example::EchoResponse* response, - google::protobuf::Closure* done) { + EchoServiceImpl() : _index(0) {} + virtual ~EchoServiceImpl() {}; + void set_index(size_t index, int64_t sleep_us) { + _index = index; + _sleep_us = sleep_us; + } + virtual void Echo(google::protobuf::RpcController* cntl_base, + const example::EchoRequest* request, + example::EchoResponse* response, + google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base); + if (_sleep_us > 0) { + double delay = _sleep_us; + const double a = FLAGS_exception_ratio * 0.5; + if (a >= 0.0001) { + double x = butil::RandDouble(); + if (x < a) { + const double min_sleep_us = FLAGS_min_ratio * _sleep_us; + delay = min_sleep_us + (_sleep_us - min_sleep_us) * x / a; + } else if (x + a > 1) { + const double max_sleep_us = FLAGS_max_ratio * _sleep_us; + delay = _sleep_us + (max_sleep_us - _sleep_us) * (x + a - 1) / a; + } + } + if (FLAGS_spin) { + int64_t end_time = butil::gettimeofday_us() + (int64_t)delay; + while (butil::gettimeofday_us() < end_time) {} + } else { + bthread_usleep((int64_t)delay); + } + } // Echo request and its attachment response->set_message(request->message()); if (FLAGS_echo_attachment) { cntl->response_attachment().append(cntl->request_attachment()); } + _nreq << 1; } + + size_t num_requests() const { return _nreq.get_value(); } + +private: + size_t _index; + int64_t _sleep_us; + bvar::Adder<size_t> _nreq; }; int main(int argc, char* argv[]) { // Parse gflags. We recommend you to use gflags as well. GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); - // Generally you only need one Server. - brpc::Server server; - - // Instance of your service. - EchoServiceImpl echo_service_impl; - - // Add the service into server. Notice the second parameter, because the - // service is put on stack, we don't want server to delete it, otherwise - // use brpc::SERVER_OWNS_SERVICE. - if (server.AddService(&echo_service_impl, - brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { - LOG(ERROR) << "Fail to add service"; + if (FLAGS_server_num <= 0) { + LOG(ERROR) << "server_num must be positive"; return -1; } - // Start the server. + // We need multiple servers in this example. + brpc::Server* servers = new brpc::Server[FLAGS_server_num]; + // For more options see `brpc/server.h'. brpc::ServerOptions options; options.idle_timeout_sec = FLAGS_idle_timeout_s; options.max_concurrency = FLAGS_max_concurrency; - if (server.Start(FLAGS_port, &options) != 0) { - LOG(ERROR) << "Fail to start EchoServer"; - return -1; + + butil::StringSplitter sp(FLAGS_sleep_us.c_str(), ','); + std::vector<int64_t> sleep_list; + for (; sp; ++sp) { + sleep_list.push_back(strtoll(sp.field(), NULL, 10)); + } + if (sleep_list.empty()) { + sleep_list.push_back(0); } - // Wait until Ctrl-C is pressed, then Stop() and Join() the server. - server.RunUntilAskedToQuit(); + // Instance of your services. + EchoServiceImpl* echo_service_impls = new EchoServiceImpl[FLAGS_server_num]; + // Add the service into servers. Notice the second parameter, because the + // service is put on stack, we don't want server to delete it, otherwise + // use brpc::SERVER_OWNS_SERVICE. + for (int i = 0; i < FLAGS_server_num; ++i) { + int64_t sleep_us = sleep_list[(size_t)i < sleep_list.size() ? i : (sleep_list.size() - 1)]; + echo_service_impls[i].set_index(i, sleep_us); + // will be shown on /version page + servers[i].set_version(butil::string_printf( + "example/dynamic_partition_echo_c++[%d]", i)); + if (servers[i].AddService(&echo_service_impls[i], + brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + // Start the server. + int port = FLAGS_port + i; + if (servers[i].Start(port, &options) != 0) { + LOG(ERROR) << "Fail to start EchoServer"; + return -1; + } + } + + // Service logic are running in separate worker threads, for main thread, + // we don't have much to do, just spinning. + std::vector<size_t> last_num_requests(FLAGS_server_num); + while (!brpc::IsAskedToQuit()) { + sleep(1); + + size_t cur_total = 0; + for (int i = 0; i < FLAGS_server_num; ++i) { + const size_t current_num_requests = + echo_service_impls[i].num_requests(); + size_t diff = current_num_requests - last_num_requests[i]; + cur_total += diff; + last_num_requests[i] = current_num_requests; + LOG(INFO) << "S[" << i << "]=" << diff << ' ' << noflush; + } + LOG(INFO) << "[total=" << cur_total << ']'; + } + + // Don't forget to stop and join the server otherwise still-running + // worker threads may crash your program. Clients will have/ at most + // `FLAGS_logoff_ms' to close their connections. If some connections + // still remains after `FLAGS_logoff_ms', they will be closed by force. + for (int i = 0; i < FLAGS_server_num; ++i) { + servers[i].Stop(FLAGS_logoff_ms); + } + for (int i = 0; i < FLAGS_server_num; ++i) { + servers[i].Join(); + } + delete [] servers; + delete [] echo_service_impls; return 0; } diff --git a/example/partition_echo_c++/server_list b/example/partition_echo_c++/server_list index 9e2272f..835f984 100644 --- a/example/partition_echo_c++/server_list +++ b/example/partition_echo_c++/server_list @@ -1,10 +1,9 @@ # You can change following lines when client is running to see how client # deals with partition changes. - 0.0.0.0:8002 1/4 # unmatched num - 0.0.0.0:8002 -1/3 # invalid index - 0.0.0.0:8002 1/3 - 0.0.0.0:8002 1/3 # repeated + 0.0.0.0:8002 1/4 # ignored: unmatched num + 0.0.0.0:8002 -1/3 # ignored: invalid index + 0.0.0.0:8002 1/3 + 0.0.0.0:8002 1/3 # ignored: repeated 0.0.0.0:8002 2/3 - 0.0.0.0:8002 0/3 - + 0.0.0.0:8002 0/3 --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org