This is an automated email from the ASF dual-hosted git repository.

jamesge pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc9eaa  replace example/partition_echo_c++/server.cpp with that in 
dynamic_partition_echo_c++
     new 1430306  Merge pull request #792 from 
zyearn/optimize_server_in_partition_channel_ex
0bc9eaa is described below

commit 0bc9eaa36383021eb5da73d572600ade881d8817
Author: zhujiashun <zhujiashun2...@gmail.com>
AuthorDate: Fri May 31 15:31:36 2019 +0800

    replace example/partition_echo_c++/server.cpp with that in 
dynamic_partition_echo_c++
---
 example/partition_echo_c++/server.cpp  | 139 +++++++++++++++++++++++++++------
 example/partition_echo_c++/server_list |  11 ++-
 2 files changed, 120 insertions(+), 30 deletions(-)

diff --git a/example/partition_echo_c++/server.cpp 
b/example/partition_echo_c++/server.cpp
index bd42714..2e0b7b1 100644
--- a/example/partition_echo_c++/server.cpp
+++ b/example/partition_echo_c++/server.cpp
@@ -14,9 +14,13 @@
 
 // A server to receive EchoRequest and send back EchoResponse.
 
+#include <vector>
 #include <gflags/gflags.h>
+#include <butil/time.h>
 #include <butil/logging.h>
 #include <butil/string_printf.h>
+#include <butil/string_splitter.h>
+#include <butil/rand_util.h>
 #include <brpc/server.h>
 #include "echo.pb.h"
 
@@ -27,57 +31,144 @@ DEFINE_int32(idle_timeout_s, -1, "Connection will be 
closed if there is no "
 DEFINE_int32(logoff_ms, 2000, "Maximum duration of server's LOGOFF state "
              "(waiting for client to close connection before server stops)");
 DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(server_num, 1, "Number of servers");
+DEFINE_string(sleep_us, "", "Sleep so many microseconds before responding");
+DEFINE_bool(spin, false, "spin rather than sleep");
+DEFINE_double(exception_ratio, 0.1, "Percentage of irregular latencies");
+DEFINE_double(min_ratio, 0.2, "min_sleep / sleep_us");
+DEFINE_double(max_ratio, 10, "max_sleep / sleep_us");
 
 // Your implementation of example::EchoService
 class EchoServiceImpl : public example::EchoService {
 public:
-    EchoServiceImpl() {}
-    ~EchoServiceImpl() {};
-    void Echo(google::protobuf::RpcController* cntl_base,
-              const example::EchoRequest* request,
-              example::EchoResponse* response,
-              google::protobuf::Closure* done) {
+    EchoServiceImpl() : _index(0) {}
+    virtual ~EchoServiceImpl() {};
+    void set_index(size_t index, int64_t sleep_us) { 
+        _index = index; 
+        _sleep_us = sleep_us;
+    }
+    virtual void Echo(google::protobuf::RpcController* cntl_base,
+                      const example::EchoRequest* request,
+                      example::EchoResponse* response,
+                      google::protobuf::Closure* done) {
         brpc::ClosureGuard done_guard(done);
         brpc::Controller* cntl =
             static_cast<brpc::Controller*>(cntl_base);
+        if (_sleep_us > 0) {
+            double delay = _sleep_us;
+            const double a = FLAGS_exception_ratio * 0.5;
+            if (a >= 0.0001) {
+                double x = butil::RandDouble();
+                if (x < a) {
+                    const double min_sleep_us = FLAGS_min_ratio * _sleep_us;
+                    delay = min_sleep_us + (_sleep_us - min_sleep_us) * x / a;
+                } else if (x + a > 1) {
+                    const double max_sleep_us = FLAGS_max_ratio * _sleep_us;
+                    delay = _sleep_us + (max_sleep_us - _sleep_us) * (x + a - 
1) / a;
+                }
+            }
+            if (FLAGS_spin) {
+                int64_t end_time = butil::gettimeofday_us() + (int64_t)delay;
+                while (butil::gettimeofday_us() < end_time) {}
+            } else {
+                bthread_usleep((int64_t)delay);
+            }
+        }
 
         // Echo request and its attachment
         response->set_message(request->message());
         if (FLAGS_echo_attachment) {
             cntl->response_attachment().append(cntl->request_attachment());
         }
+        _nreq << 1;
     }
+
+    size_t num_requests() const { return _nreq.get_value(); }
+
+private:
+    size_t _index;
+    int64_t _sleep_us;
+    bvar::Adder<size_t> _nreq;
 };
 
 int main(int argc, char* argv[]) {
     // Parse gflags. We recommend you to use gflags as well.
     GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
 
-    // Generally you only need one Server.
-    brpc::Server server;
-
-    // Instance of your service.
-    EchoServiceImpl echo_service_impl;
-
-    // Add the service into server. Notice the second parameter, because the
-    // service is put on stack, we don't want server to delete it, otherwise
-    // use brpc::SERVER_OWNS_SERVICE.
-    if (server.AddService(&echo_service_impl, 
-                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
-        LOG(ERROR) << "Fail to add service";
+    if (FLAGS_server_num <= 0) {
+        LOG(ERROR) << "server_num must be positive";
         return -1;
     }
 
-    // Start the server. 
+    // We need multiple servers in this example.
+    brpc::Server* servers = new brpc::Server[FLAGS_server_num];
+    // For more options see `brpc/server.h'.
     brpc::ServerOptions options;
     options.idle_timeout_sec = FLAGS_idle_timeout_s;
     options.max_concurrency = FLAGS_max_concurrency;
-    if (server.Start(FLAGS_port, &options) != 0) {
-        LOG(ERROR) << "Fail to start EchoServer";
-        return -1;
+
+    butil::StringSplitter sp(FLAGS_sleep_us.c_str(), ',');
+    std::vector<int64_t> sleep_list;
+    for (; sp; ++sp) {
+        sleep_list.push_back(strtoll(sp.field(), NULL, 10));
+    }
+    if (sleep_list.empty()) {
+        sleep_list.push_back(0);
     }
 
-    // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
-    server.RunUntilAskedToQuit();
+    // Instance of your services.
+    EchoServiceImpl* echo_service_impls = new 
EchoServiceImpl[FLAGS_server_num];
+    // Add the service into servers. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    for (int i = 0; i < FLAGS_server_num; ++i) {
+        int64_t sleep_us = sleep_list[(size_t)i < sleep_list.size() ? i : 
(sleep_list.size() - 1)];
+        echo_service_impls[i].set_index(i, sleep_us);
+        // will be shown on /version page
+        servers[i].set_version(butil::string_printf(
+                    "example/dynamic_partition_echo_c++[%d]", i));
+        if (servers[i].AddService(&echo_service_impls[i], 
+                                  brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+            LOG(ERROR) << "Fail to add service";
+            return -1;
+        }
+        // Start the server.
+        int port = FLAGS_port + i;
+        if (servers[i].Start(port, &options) != 0) {
+            LOG(ERROR) << "Fail to start EchoServer";
+            return -1;
+        }
+    }
+
+    // Service logic are running in separate worker threads, for main thread,
+    // we don't have much to do, just spinning.
+    std::vector<size_t> last_num_requests(FLAGS_server_num);
+    while (!brpc::IsAskedToQuit()) {
+        sleep(1);
+
+        size_t cur_total = 0;
+        for (int i = 0; i < FLAGS_server_num; ++i) {
+            const size_t current_num_requests =
+                    echo_service_impls[i].num_requests();
+            size_t diff = current_num_requests - last_num_requests[i];
+            cur_total += diff;
+            last_num_requests[i] = current_num_requests;
+            LOG(INFO) << "S[" << i << "]=" << diff << ' ' << noflush;
+        }
+        LOG(INFO) << "[total=" << cur_total << ']';
+    }
+
+    // Don't forget to stop and join the server otherwise still-running
+    // worker threads may crash your program. Clients will have/ at most
+    // `FLAGS_logoff_ms' to close their connections. If some connections
+    // still remains after `FLAGS_logoff_ms', they will be closed by force.
+    for (int i = 0; i < FLAGS_server_num; ++i) {
+        servers[i].Stop(FLAGS_logoff_ms);
+    }
+    for (int i = 0; i < FLAGS_server_num; ++i) {
+        servers[i].Join();
+    }
+    delete [] servers;
+    delete [] echo_service_impls;
     return 0;
 }
diff --git a/example/partition_echo_c++/server_list 
b/example/partition_echo_c++/server_list
index 9e2272f..835f984 100644
--- a/example/partition_echo_c++/server_list
+++ b/example/partition_echo_c++/server_list
@@ -1,10 +1,9 @@
 # You can change following lines when client is running to see how client 
 # deals with partition changes.
 
- 0.0.0.0:8002  1/4          # unmatched num
-  0.0.0.0:8002    -1/3      # invalid index
-  0.0.0.0:8002  1/3        
- 0.0.0.0:8002   1/3         # repeated
+ 0.0.0.0:8002  1/4          # ignored: unmatched num
+ 0.0.0.0:8002  -1/3         # ignored: invalid index
+ 0.0.0.0:8002  1/3
+ 0.0.0.0:8002  1/3          # ignored: repeated
  0.0.0.0:8002  2/3
- 0.0.0.0:8002   0/3
-
+ 0.0.0.0:8002  0/3


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to