This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 15469ed3 Support connect on socket create (#2574)
15469ed3 is described below

commit 15469ed314f22e4cc598e0f0abdde64463e0b768
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Thu Jun 13 12:23:28 2024 +0800

    Support connect on socket create (#2574)
---
 src/brpc/rtmp.cpp                |   2 +-
 src/brpc/selective_channel.cpp   |  20 +++++---
 src/brpc/socket.cpp              |  65 +++++++++++++++++++------
 src/brpc/socket.h                |  27 ++++++++---
 src/brpc/socket_inl.h            |   2 +
 src/brpc/socket_map.cpp          |  17 ++++---
 src/brpc/versioned_ref_with_id.h |   5 ++
 test/brpc_socket_unittest.cpp    |  21 ++++----
 test/brpc_ssl_unittest.cpp       | 102 ++++++++++++++++++++++++++++++++++++---
 9 files changed, 207 insertions(+), 54 deletions(-)

diff --git a/src/brpc/rtmp.cpp b/src/brpc/rtmp.cpp
index ae6eb6ad..4913881c 100644
--- a/src/brpc/rtmp.cpp
+++ b/src/brpc/rtmp.cpp
@@ -1087,7 +1087,7 @@ public:
         : _connect_options(connect_options) {
     }
 
-    int CreateSocket(const SocketOptions& opt, SocketId* id) {
+    int CreateSocket(const SocketOptions& opt, SocketId* id) override {
         SocketOptions sock_opt = opt;
         sock_opt.app_connect = std::make_shared<RtmpConnect>();
         sock_opt.initial_parsing_context = new 
policy::RtmpContext(&_connect_options, NULL);
diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index 9ad5f9a0..5a815821 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -158,8 +158,8 @@ private:
 ChannelBalancer::~ChannelBalancer() {
     for (ChannelToIdMap::iterator
              it = _chan_map.begin(); it != _chan_map.end(); ++it) {
-        SocketUniquePtr ptr(it->second); // Dereference
         it->second->ReleaseAdditionalReference();
+        it->second->ReleaseHCRelatedReference();
     }
     _chan_map.clear();
 }
@@ -196,15 +196,21 @@ int ChannelBalancer::AddChannel(ChannelBase* sub_channel,
         return -1;
     }
     SocketUniquePtr ptr;
-    CHECK_EQ(0, Socket::Address(sock_id, &ptr));
+    int rc = Socket::AddressFailedAsWell(sock_id, &ptr);
+    if (rc < 0 || (rc > 0 && !ptr->HCEnabled())) {
+        LOG(FATAL) << "Fail to address SocketId=" << sock_id;
+        return -1;
+    }
     if (!AddServer(ServerId(sock_id))) {
         LOG(ERROR) << "Duplicated sub_channel=" << sub_channel;
         // sub_chan will be deleted when the socket is recycled.
         ptr->SetFailed();
+        // Cancel health checking.
+        ptr->ReleaseHCRelatedReference();
         return -1;
     }
-    ptr->SetHCRelatedRefHeld(); // set held status
-    _chan_map[sub_channel]= ptr.release();  // Add reference.
+    // The health-check-related reference has been held on created.
+    _chan_map[sub_channel]= ptr.get();
     if (handle) {
         *handle = sock_id;
     }
@@ -223,13 +229,11 @@ void 
ChannelBalancer::RemoveAndDestroyChannel(SelectiveChannel::ChannelHandle ha
             BAIDU_SCOPED_LOCK(_mutex);
             CHECK_EQ(1UL, _chan_map.erase(sub->chan));
         }
-        {
-            ptr->SetHCRelatedRefReleased(); // set released status to cancel 
health checking
-            SocketUniquePtr ptr2(ptr.get()); // Dereference.
-        }
         if (rc == 0) {
             ptr->ReleaseAdditionalReference();
         }
+        // Cancel health checking.
+        ptr->ReleaseHCRelatedReference();
     }
 }
 
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 85aa1507..ac1c37ae 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -577,6 +577,10 @@ int Socket::ResetFileDescriptor(int fd) {
     if (!ValidFileDescriptor(fd)) {
         return 0;
     }
+    if (_remote_side == butil::EndPoint()) {
+        // OK to fail, non-socket fd does not support this.
+        butil::get_remote_side(fd, &_remote_side);
+    }
     // OK to fail, non-socket fd does not support this.
     if (butil::get_local_side(fd, &_local_side) != 0) {
         _local_side = butil::EndPoint();
@@ -781,6 +785,19 @@ int Socket::OnCreated(const SocketOptions& options) {
     _keepalive_options = options.keepalive_options;
     CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
     _is_write_shutdown = false;
+    int fd = options.fd;
+    if (!ValidFileDescriptor(fd) && options.connect_on_create) {
+        // Connect on create.
+        fd = DoConnect(options.connect_abstime, NULL, NULL);
+        if (fd < 0) {
+            PLOG(ERROR) << "Fail to connect to " << options.remote_side;
+            int error_code = errno != 0 ? errno : EHOSTDOWN;
+            SetFailed(error_code, "Fail to connect to %s: %s",
+                      butil::endpoint2str(options.remote_side).c_str(),
+                      berror(error_code));
+            return -1;
+        }
+    }
     // Must be the last one! Internal fields of this Socket may be accessed
     // just after calling ResetFileDescriptor.
     if (ResetFileDescriptor(options.fd) != 0) {
@@ -790,6 +807,7 @@ int Socket::OnCreated(const SocketOptions& options) {
                      berror(saved_errno));
         return -1;
     }
+    HoldHCRelatedRef();
     guard.dismiss();
 
     return 0;
@@ -940,6 +958,20 @@ std::string Socket::OnDescription() const {
     return result;
 }
 
+void Socket::HoldHCRelatedRef() {
+    if (_health_check_interval_s > 0) {
+        _is_hc_related_ref_held = true;
+        AddReference();
+    }
+}
+
+void Socket::ReleaseHCRelatedReference() {
+    if (_health_check_interval_s > 0) {
+        _is_hc_related_ref_held = false;
+        Dereference();
+    }
+}
+
 int Socket::WaitAndReset(int32_t expected_nref) {
     const uint32_t id_ver = VersionOfVRefId(id());
     uint64_t vref;
@@ -1350,16 +1382,27 @@ int Socket::CheckConnected(int sockfd) {
         return -1;
     }
 
-    butil::EndPoint local_point;
-    CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
-    LOG_IF(INFO, FLAGS_log_connected)
-            << "Connected to " << remote_side()
-            << " via fd=" << (int)sockfd << " SocketId=" << id()
-            << " local_side=" << local_point;
+    if (FLAGS_log_connected) {
+        butil::EndPoint local_point;
+        CHECK_EQ(0, butil::get_local_side(sockfd, &local_point));
+        LOG(INFO) << "Connected to " << remote_side()
+                  << " via fd=" << (int)sockfd << " SocketId=" << id()
+                  << " local_side=" << local_point;
+    }
+
     // Doing SSL handshake after TCP connected
     return SSLHandshake(sockfd, false);
 }
 
+int Socket::DoConnect(const timespec* abstime,
+                      int (*on_connect)(int, int, void*), void* data) {
+    if (_conn) {
+        return _conn->Connect(this, abstime, on_connect, data);
+    } else {
+        return Connect(abstime, on_connect, data);
+    }
+}
+
 int Socket::ConnectIfNot(const timespec* abstime, WriteRequest* req) {
     if (_fd.load(butil::memory_order_consume) >= 0) {
        return 0;
@@ -1370,14 +1413,8 @@ int Socket::ConnectIfNot(const timespec* abstime, 
WriteRequest* req) {
     SocketUniquePtr s;
     ReAddress(&s);
     req->set_socket(s.get());
-    if (_conn) {
-        if (_conn->Connect(this, abstime, KeepWriteIfConnected, req) < 0) {
-            return -1;
-        }
-    } else {
-        if (Connect(abstime, KeepWriteIfConnected, req) < 0) {
-            return -1;
-        }
+    if (DoConnect(abstime, KeepWriteIfConnected, req) < 0) {
+        return -1;
     }
     s.release();
     return 1;    
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 6d9c8f11..2a6ab748 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -256,6 +256,14 @@ struct SocketOptions {
     // user->BeforeRecycle() before recycling.
     int fd;
     butil::EndPoint remote_side;
+    // If `connect_on_create' is true and `fd' is less than 0,
+    // a client connection will be established to remote_side()
+    // regarding deadline `connect_abstime' when Socket is being created.
+    // Default: false, means that a connection will be established
+    // on first write.
+    bool connect_on_create;
+    // Default: NULL, means no timeout.
+    const timespec* connect_abstime;
     SocketUser* user;
     // When *edge-triggered* events happen on the file descriptor, callback
     // `on_edge_triggered_events' will be called. Inside the callback, user
@@ -409,16 +417,15 @@ public:
 
     // True if health checking is enabled.
     bool HCEnabled() const {
+        // This fence makes sure that we see change of
+        // `_is_hc_related_ref_held' before changing `_versioned_ref.
+        butil::atomic_thread_fence(butil::memory_order_acquire);
         return _health_check_interval_s > 0 && _is_hc_related_ref_held;
     }
 
-    // When someone holds a health-checking-related reference,
-    // this function need to be called to make health checking run normally.
-    void SetHCRelatedRefHeld() { _is_hc_related_ref_held = true; }
-    // When someone releases the health-checking-related reference,
-    // this function need to be called to cancel health checking.
-    void SetHCRelatedRefReleased() { _is_hc_related_ref_held = false; }
-    bool IsHCRelatedRefHeld() const { return _is_hc_related_ref_held; }
+    // Release the health-checking-related
+    // reference which is held on created.
+    void ReleaseHCRelatedReference();
 
     // After health checking is complete, set _hc_started to false.
     void AfterHCCompleted() { _hc_started.store(false, 
butil::memory_order_relaxed); }
@@ -665,6 +672,9 @@ private:
 
     std::string OnDescription() const;
 
+    // Hold the health-checking-related
+    // reference on created.
+    void HoldHCRelatedRef();
 
     static int Status(SocketId, int32_t* nref = NULL);  // for unit-test.
 
@@ -699,8 +709,11 @@ private:
     // starting a connection request and `on_connect' will be called
     // when connecting completes (whether it succeeds or not)
     // Returns the socket fd on success, -1 otherwise
+    int DoConnect(const timespec* abstime,
+                  int (*on_connect)(int fd, int err, void* data), void* data);
     int Connect(const timespec* abstime,
                 int (*on_connect)(int fd, int err, void* data), void* data);
+
     int CheckConnected(int sockfd);
 
     // [Not thread-safe] Only used by `Write'.
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index a8ff3ce8..d704b900 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -25,6 +25,8 @@ namespace brpc {
 
 inline SocketOptions::SocketOptions()
     : fd(-1)
+    , connect_on_create(false)
+    , connect_abstime(NULL)
     , user(NULL)
     , on_edge_triggered_events(NULL)
     , health_check_interval_s(-1)
diff --git a/src/brpc/socket_map.cpp b/src/brpc/socket_map.cpp
index 774bf5a7..609d2776 100644
--- a/src/brpc/socket_map.cpp
+++ b/src/brpc/socket_map.cpp
@@ -58,7 +58,7 @@ static butil::static_atomic<SocketMap*> g_socket_map = 
BUTIL_STATIC_ATOMIC_INIT(
 
 class GlobalSocketCreator : public SocketCreator {
 public:
-    int CreateSocket(const SocketOptions& opt, SocketId* id) {
+    int CreateSocket(const SocketOptions& opt, SocketId* id) override {
         SocketOptions sock_opt = opt;
         sock_opt.health_check_interval_s = FLAGS_health_check_interval;
         return get_client_side_messenger()->Create(sock_opt, id);
@@ -237,8 +237,7 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* id,
             return 0;
         }
         // A socket w/o HC is failed (permanently), replace it.
-        sc->socket->SetHCRelatedRefReleased(); // set released status to 
cancel health checking
-        SocketUniquePtr ptr(sc->socket);  // Remove the ref added at insertion.
+        sc->socket->ReleaseHCRelatedReference();
         _map.erase(key); // in principle, we can override the entry in map w/o
         // removing and inserting it again. But this would make error branches
         // below have to remove the entry before returning, which is
@@ -258,12 +257,15 @@ int SocketMap::Insert(const SocketMapKey& key, SocketId* 
id,
     // use SocketUniquePtr which cannot put into containers before c++11.
     // The ref will be removed at entry's removal.
     SocketUniquePtr ptr;
-    if (Socket::Address(tmp_id, &ptr) != 0) {
+    int rc = Socket::AddressFailedAsWell(tmp_id, &ptr);
+    if (rc < 0) {
         LOG(FATAL) << "Fail to address SocketId=" << tmp_id;
         return -1;
+    } else if (rc > 0 && !ptr->HCEnabled()) {
+        LOG(FATAL) << "Failed socket is not HC-enabled";
+        return -1;
     }
-    ptr->SetHCRelatedRefHeld(); // set held status
-    SingleConnection new_sc = { 1, ptr.release(), 0 };
+    SingleConnection new_sc = { 1, ptr.get(), 0 };
     _map[key] = new_sc;
     *id = tmp_id;
     mu.unlock();
@@ -301,8 +303,7 @@ void SocketMap::RemoveInternal(const SocketMapKey& key,
             _map.erase(key);
             mu.unlock();
             s->ReleaseAdditionalReference(); // release extra ref
-            s->SetHCRelatedRefReleased(); // set released status to cancel 
health checking
-            SocketUniquePtr ptr(s);  // Dereference
+            s->ReleaseHCRelatedReference();
         }
     }
 }
diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h
index 38141f3d..c78f019b 100644
--- a/src/brpc/versioned_ref_with_id.h
+++ b/src/brpc/versioned_ref_with_id.h
@@ -296,6 +296,11 @@ friend void DereferenceVersionedRefWithId<>(T* r);
     // it will be recycled automatically and T::BeforeRecycled() will be 
called.
     int Dereference();
 
+    // Increase the reference count by 1.
+    void AddReference() {
+        _versioned_ref.fetch_add(1, butil::memory_order_release);
+    }
+
     // Make this socket addressable again.
     // If nref is less than `at_least_nref', VersionedRefWithId was
     // abandoned during revival and cannot be revived.
diff --git a/test/brpc_socket_unittest.cpp b/test/brpc_socket_unittest.cpp
index f278c46b..e83eef7e 100644
--- a/test/brpc_socket_unittest.cpp
+++ b/test/brpc_socket_unittest.cpp
@@ -504,7 +504,6 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
     {
         brpc::SocketUniquePtr s;
         ASSERT_EQ(0, brpc::Socket::Address(id, &s));
-        s->SetHCRelatedRefHeld(); // set held status
         global_sock = s.get();
         ASSERT_TRUE(s.get());
         ASSERT_EQ(-1, s->fd());
@@ -542,6 +541,7 @@ TEST_F(SocketTest, not_health_check_when_nref_hits_0) {
 #endif
         ASSERT_TRUE(src.empty());
         ASSERT_EQ(-1, s->fd());
+        s->ReleaseHCRelatedReference();
     }
     // StartHealthCheck is possibly still running. Spin until global_sock
     // is NULL(set in CheckRecycle::BeforeRecycle). Notice that you should
@@ -650,12 +650,14 @@ TEST_F(SocketTest, health_check) {
     options.user = new CheckRecycle;
     options.health_check_interval_s = kCheckInteval/*s*/;
     ASSERT_EQ(0, brpc::Socket::Create(options, &id));
-    brpc::SocketUniquePtr s;
-    ASSERT_EQ(0, brpc::Socket::Address(id, &s));
-
-    s->SetHCRelatedRefHeld(); // set held status
-    global_sock = s.get();
-    ASSERT_TRUE(s.get());
+    brpc::Socket* s = NULL;
+    {
+        brpc::SocketUniquePtr ptr;
+        ASSERT_EQ(0, brpc::Socket::Address(id, &ptr));
+        s = ptr.get();
+    }
+    global_sock = s;
+    ASSERT_NE(nullptr, s);
     ASSERT_EQ(-1, s->fd());
     ASSERT_EQ(point, s->remote_side());
     ASSERT_EQ(id, s->id());
@@ -763,7 +765,7 @@ TEST_F(SocketTest, health_check) {
         ASSERT_NE(0, ptr->fd());
     }
 
-    s.release()->Dereference();
+    s->ReleaseHCRelatedReference();
 
     // Must stop messenger before SetFailed the id otherwise StartHealthCheck
     // still has chance to get reconnected and revive the id.
@@ -779,7 +781,8 @@ TEST_F(SocketTest, health_check) {
         bthread_usleep(1000);
         ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L);
     }
-    ASSERT_EQ(-1, brpc::Socket::Status(id));
+    nref = 0;
+    ASSERT_EQ(-1, brpc::Socket::Status(id, &nref)) << "nref=" << nref;
     // The id is invalid.
     brpc::SocketUniquePtr ptr;
     ASSERT_EQ(-1, brpc::Socket::Address(id, &ptr));
diff --git a/test/brpc_ssl_unittest.cpp b/test/brpc_ssl_unittest.cpp
index e101f534..3c64ba76 100644
--- a/test/brpc_ssl_unittest.cpp
+++ b/test/brpc_ssl_unittest.cpp
@@ -26,6 +26,9 @@
 #include <butil/macros.h>
 #include <butil/fd_guard.h>
 #include <butil/files/scoped_file.h>
+#include <brpc/policy/baidu_rpc_meta.pb.h>
+#include <brpc/policy/baidu_rpc_protocol.h>
+#include <brpc/policy/most_common_message.h>
 #include "brpc/global.h"
 #include "brpc/socket.h"
 #include "brpc/server.h"
@@ -54,11 +57,11 @@ const std::string EXP_RESPONSE = "world";
 class EchoServiceImpl : public test::EchoService {
 public:
     EchoServiceImpl() : count(0) {}
-    virtual ~EchoServiceImpl() { g_delete = true; }
-    virtual void Echo(google::protobuf::RpcController* cntl_base,
-                      const test::EchoRequest* request,
-                      test::EchoResponse* response,
-                      google::protobuf::Closure* done) {
+    ~EchoServiceImpl() override { g_delete = true; }
+    void Echo(google::protobuf::RpcController* cntl_base,
+              const test::EchoRequest* request,
+              test::EchoResponse* response,
+              google::protobuf::Closure* done) override {
         brpc::ClosureGuard done_guard(done);
         brpc::Controller* cntl = (brpc::Controller*)cntl_base;
         count.fetch_add(1, butil::memory_order_relaxed);
@@ -207,7 +210,7 @@ TEST_F(SSLTest, force_ssl) {
         test::EchoService_Stub stub(&channel);
         test::EchoResponse res;
         stub.Echo(&cntl, &req, &res, NULL);
-        EXPECT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText();
+        ASSERT_EQ(EXP_RESPONSE, res.message()) << cntl.ErrorText();
     }
 
     {
@@ -218,13 +221,98 @@ TEST_F(SSLTest, force_ssl) {
         test::EchoService_Stub stub(&channel);
         test::EchoResponse res;
         stub.Echo(&cntl, &req, &res, NULL);
-        EXPECT_TRUE(cntl.Failed());
+        ASSERT_TRUE(cntl.Failed());
     }
 
     ASSERT_EQ(0, server.Stop(0));
     ASSERT_EQ(0, server.Join());
 }
 
+void ProcessResponse(brpc::InputMessageBase* msg_base) {
+    brpc::DestroyingPtr<brpc::policy::MostCommonMessage> msg(
+        static_cast<brpc::policy::MostCommonMessage*>(msg_base));
+    brpc::policy::RpcMeta meta;
+    ASSERT_TRUE(brpc::ParsePbFromIOBuf(&meta, msg->meta));
+    const brpc::policy::RpcResponseMeta &response_meta = meta.response();
+    ASSERT_EQ(0, response_meta.error_code()) << response_meta.error_text();
+
+    const brpc::CallId cid = { static_cast<uint64_t>(meta.correlation_id()) };
+    brpc::Controller* cntl = NULL;
+    ASSERT_EQ(0, bthread_id_lock(cid, (void**)&cntl));
+    ASSERT_NE(nullptr, cntl);
+    ASSERT_TRUE(brpc::ParsePbFromIOBuf(cntl->response(), msg->payload));
+    ASSERT_EQ(0, bthread_id_unlock_and_destroy(cid));
+}
+
+TEST_F(SSLTest, connect_on_create) {
+    brpc::Protocol dummy_protocol = {
+        brpc::policy::ParseRpcMessage, brpc::SerializeRequestDefault,
+        brpc::policy::PackRpcRequest,NULL, ProcessResponse,
+        NULL, NULL, NULL, brpc::CONNECTION_TYPE_ALL, "ssl_ut_baidu"
+    };
+    ASSERT_EQ(0, RegisterProtocol((brpc::ProtocolType)30, dummy_protocol));
+
+    brpc::InputMessageHandler dummy_handler ={
+        dummy_protocol.parse, dummy_protocol.process_response,
+        NULL, NULL, dummy_protocol.name
+    };
+    brpc::InputMessenger messenger;
+    ASSERT_EQ(0, messenger.AddHandler(dummy_handler));
+
+    const int port = 8613;
+    brpc::Server server;
+    brpc::ServerOptions server_options;
+    server_options.force_ssl = true;
+
+    brpc::CertInfo cert;
+    cert.certificate = "cert1.crt";
+    cert.private_key = "cert1.key";
+    server_options.mutable_ssl_options()->default_cert = cert;
+
+    EchoServiceImpl echo_svc;
+    ASSERT_EQ(0, server.AddService(
+        &echo_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
+    ASSERT_EQ(0, server.Start(port, &server_options));
+
+    // Create client socket.
+    brpc::SocketOptions socket_options;
+    butil::EndPoint ep(butil::IP_ANY, port);
+    socket_options.remote_side = ep;
+    socket_options.connect_on_create = true;
+    socket_options.on_edge_triggered_events = 
brpc::InputMessenger::OnNewMessages;
+    socket_options.user = &messenger;
+    brpc::ChannelSSLOptions ssl_options;
+    SSL_CTX* raw_ctx = brpc::CreateClientSSLContext(ssl_options);
+    ASSERT_NE(nullptr, raw_ctx);
+    std::shared_ptr<brpc::SocketSSLContext> ssl_ctx
+        = std::make_shared<brpc::SocketSSLContext>();
+    ssl_ctx->raw_ctx = raw_ctx;
+    socket_options.initial_ssl_ctx = ssl_ctx;
+
+    brpc::SocketId socket_id;
+    ASSERT_EQ(0, brpc::Socket::Create(socket_options, &socket_id));
+    brpc::SocketUniquePtr ptr;
+    ASSERT_EQ(0, brpc::Socket::Address(socket_id, &ptr));
+
+    test::EchoRequest req;
+    req.set_message(EXP_REQUEST);
+    for (int i = 0; i < 100; ++i) {
+        test::EchoResponse res;
+        butil::IOBuf request_buf;
+        butil::IOBuf request_body;
+        brpc::Controller cntl;
+        cntl._response = &res;
+        const brpc::CallId correlation_id = cntl.call_id();
+        brpc::SerializeRequestDefault(&request_body, &cntl, &req);
+        brpc::policy::PackRpcRequest(&request_buf, NULL, correlation_id.value,
+                                     
test::EchoService_Stub::descriptor()->method(0),
+                                     &cntl, request_body, NULL);
+        ASSERT_EQ(0, ptr->Write(&request_buf));
+        brpc::Join(correlation_id);
+        ASSERT_EQ(EXP_RESPONSE, res.message());
+    }
+}
+
 void CheckCert(const char* cname, const char* cert) {
     const int port = 8613;
     brpc::Channel channel;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to