This is an automated email from the ASF dual-hosted git repository.

guangmingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new c32ddee0 Support custom modification of sub controllers (#3213)
c32ddee0 is described below

commit c32ddee06ef742179b6f93f3b2adf1a8cb160b3e
Author: Bright Chen <[email protected]>
AuthorDate: Sun Mar 1 16:04:06 2026 +0800

    Support custom modification of sub controllers (#3213)
    
    * Copy http headers from main controller to sub controller
    
    * Support custom modification of sub controllers
---
 docs/cn/combo_channel.md           | 16 ++++++++
 docs/en/combo_channel.md           | 14 +++++++
 src/brpc/parallel_channel.cpp      | 23 ++++++++++--
 src/brpc/parallel_channel.h        | 16 +++++++-
 src/brpc/policy/redis_protocol.cpp |  2 +-
 src/brpc/selective_channel.cpp     | 12 +++---
 src/brpc/socket.cpp                |  3 +-
 test/brpc_channel_unittest.cpp     | 75 ++++++++++++++++++++++++++++++++++----
 test/echo.proto                    |  1 +
 9 files changed, 142 insertions(+), 20 deletions(-)

diff --git a/docs/cn/combo_channel.md b/docs/cn/combo_channel.md
index e11c79b4..fba4f6be 100644
--- a/docs/cn/combo_channel.md
+++ b/docs/cn/combo_channel.md
@@ -60,8 +60,12 @@ public:
                         const google::protobuf::MethodDescriptor* method,
                         const google::protobuf::Message* request,
                         google::protobuf::Message* response) = 0;
+
+    virtual void MapController(int channel_index/*starting from 0*/, int 
channel_count,
+                               const Controller* main_cntl, Controller* 
sub_cntl);
 };
 ```
+### Map
 
 channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。
 
@@ -124,6 +128,18 @@ method/request/response:ParallelChannel.CallMethod()的参数。
   };
 ```
 
+### MapController
+
+channel_index:该sub channel在ParallelChannel中的位置,从0开始计数。
+
+channel_count:ParallelChannel中sub channel的数量。
+
+main_cntl:ParallelChannel.CallMethod()的参数。
+
+sub_cntl:sub 
channel的请求对应的controller。默认实现:拷贝main_cntl的http_request和request_attachment到sub_cntl中。
+
+注意:修改ClientSettings相关配置(如超时、重试等)是无效的,因为所有sub_cntl都是使用main_cntl的ClientSettings配置。
+
 ## ResponseMerger
 
 response_merger把sub 
channel的response合并入总的response,其为NULL时,则使用response->MergeFrom(*sub_response),MergeFrom的行为可概括为“除了合并repeated字段,其余都是覆盖”。如果你需要更复杂的行为,则需实现ResponseMerger。response_merger是一个个执行的,所以你并不需要考虑多个Merge同时运行的情况。response_merger在ParallelChannel析构时被删除。response_merger内含引用计数,一个response_merger可与多个sub
 channel关联。
diff --git a/docs/en/combo_channel.md b/docs/en/combo_channel.md
index 686fad59..ab68188f 100644
--- a/docs/en/combo_channel.md
+++ b/docs/en/combo_channel.md
@@ -63,6 +63,8 @@ public:
 };
 ```
 
+### Map
+
 `channel_index`: The position of the sub channel inside `ParallelChannel`, 
starting from zero.
 
 `channel_count`: The sub channel count inside `ParallelChannel`.
@@ -131,6 +133,18 @@ Common implementations of `Map()` are listed below:
   };
 ```
 
+### MapController
+
+`channel_index`: The position of the sub channel inside `ParallelChannel`, 
starting from zero.
+
+`channel_count`: The sub channel count inside `ParallelChannel`.
+
+`main_cntl`:Parameters to `ParallelChannel::CallMethod()`.
+
+`sub_cntl`:The controller corresponding to the sub-channel's requests. Default 
implementation: Copy the http_request and request_attachment of `main_cntl` to 
the `sub_cntl`.
+
+Note: Modifying `ClientSettings` configurations (such as timeout and retries) 
is ineffective because all sub controllers use the `ClientSettings` 
configuration of `main_cntl`.
+
 ## ResponseMerger
 
 `response_merger` merges responses from all sub channels into one for the 
`ParallelChannel`. When it's NULL, `response->MergeFrom(*sub_response)` is used 
instead, whose behavior can be summarized as "merge repeated fields and 
overwrite the rest". If you need more complex behavior, implement 
`ResponseMerger`. Multiple `response_merger` are called one by one to merge sub 
responses so that you do not need to consider the race conditions between 
merging multiple responses simultaneously. The [...]
diff --git a/src/brpc/parallel_channel.cpp b/src/brpc/parallel_channel.cpp
index 130712bf..de2b86f1 100644
--- a/src/brpc/parallel_channel.cpp
+++ b/src/brpc/parallel_channel.cpp
@@ -612,6 +612,7 @@ void ParallelChannel::CallMethod(
     int ndone = nchan;
     int fail_limit = 1;
     int success_limit = 1;
+    Controller::ClientSettings settings{};
     DEFINE_SMALL_ARRAY(SubCall, aps, nchan, 64);
 
     if (cntl->FailedInline()) {
@@ -718,12 +719,28 @@ void ParallelChannel::CallMethod(
     d->SaveThreadInfoOfCallsite();
     CHECK_EQ(0, bthread_id_unlock(cid));
     // Don't touch `cntl' and `d' again (for async RPC)
-    
+
+    // Apply client settings of _cntl to controllers of sub calls, except
+    // timeout. If we let sub channel do their timeout separately, when
+    // timeout happens, we get ETOOMANYFAILS rather than ERPCTIMEDOUT.
+    cntl->SaveClientSettings(&settings);
+    settings.timeout_ms = -1;
+    for (int i = 0, j = 0; i < nchan; ++i) {
+        if (!aps[i].is_skip()) {
+            ParallelChannelDone::SubDone* sd = d->sub_done(j++);
+            if (NULL != _chans[i].call_mapper) {
+                _chans[i].call_mapper->MapController(i, nchan, cntl, 
&sd->cntl);
+            } else {
+                // Forward the attachment to each sub call.
+                
sd->cntl.request_attachment().append(cntl->request_attachment());
+            }
+            sd->cntl.ApplyClientSettings(settings);
+            sd->cntl.allow_done_to_run_in_place();
+        }
+    }
     for (int i = 0, j = 0; i < nchan; ++i) {
         if (!aps[i].is_skip()) {
             ParallelChannelDone::SubDone* sd = d->sub_done(j++);
-            // Forward the attachment to each sub call
-            sd->cntl.request_attachment().append(cntl->request_attachment());
             _chans[i].chan->CallMethod(sd->ap.method, &sd->cntl,
                                        sd->ap.request, sd->ap.response, sd);
         }
diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h
index 84e5f342..292213c1 100644
--- a/src/brpc/parallel_channel.h
+++ b/src/brpc/parallel_channel.h
@@ -91,6 +91,14 @@ struct SubCall {
 //   }
 //   return SubCall(sub_method, request->sub_request(channel_index),
 //                  response->add_sub_response(), 0);
+// MapController calls to ParallelChannel to sub channels, which can have
+// different controllers.
+// Note:
+// Modifying ClientSettings configurations (such as timeout, retries, etc.)
+// is ineffective because all sub-controllers use the main controller's
+// ClientSettings configuration.
+// Examples:
+// sub_cntl->http_request().SetHeader(...);
 class CallMapper : public SharedObject {
 public:
     virtual SubCall Map(int channel_index/*starting from 0*/,
@@ -98,7 +106,13 @@ public:
                         const google::protobuf::MethodDescriptor* method,
                         const google::protobuf::Message* request,
                         google::protobuf::Message* response) {
-        return Map(channel_index, method, request, response);    
+        return Map(channel_index, method, request, response);
+    }
+
+    virtual void MapController(int channel_index/*starting from 0*/, int 
channel_count,
+                               const Controller* main_cntl, Controller* 
sub_cntl) {
+        // Forward the attachment to each sub call by default.
+        sub_cntl->request_attachment().append(main_cntl->request_attachment());
     }
 
 protected:
diff --git a/src/brpc/policy/redis_protocol.cpp 
b/src/brpc/policy/redis_protocol.cpp
index f8acf49d..9e8e148e 100644
--- a/src/brpc/policy/redis_protocol.cpp
+++ b/src/brpc/policy/redis_protocol.cpp
@@ -283,7 +283,7 @@ void SerializeRedisRequest(butil::IOBuf* buf,
     const RedisRequest* rr = (const RedisRequest*)request;
     // If redis byte size is zero, brpc call will fail with E22. Continuous 
E22 may cause E112 in the end.
     // So set failed and return useful error message
-    if (rr->ByteSize() == 0) {
+    if (GetProtobufByteSize(*rr) == 0) {
         return cntl->SetFailed(EREQUEST, "request byte size is empty");
     }
     // We work around SerializeTo of pb which is just a placeholder.
diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index ec933541..567ffa51 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -344,13 +344,13 @@ int Sender::IssueRPC(int64_t start_realtime_us) {
     sub_cntl->set_request_code(_main_cntl->request_code());
     // Forward request attachment to the subcall
     sub_cntl->request_attachment().append(_main_cntl->request_attachment());
-    sub_cntl->http_request() = _main_cntl->http_request();
+    ProtocolType protocol = _main_cntl->request_protocol();
+    if (PROTOCOL_HTTP == protocol || PROTOCOL_H2 == protocol) {
+        sub_cntl->http_request() = _main_cntl->http_request();
+    }
 
-    sel_out.channel()->CallMethod(_main_cntl->_method,
-                                  &r.sub_done->_cntl,
-                                  _request,
-                                  r.response,
-                                  r.sub_done);
+    sel_out.channel()->CallMethod(_main_cntl->_method, &r.sub_done->_cntl,
+                                  _request, r.response, r.sub_done);
     return 0;
 }
 
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index b132f2ac..c123fb6b 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -896,8 +896,7 @@ void Socket::OnFailed(int error_code, const std::string& 
error_text) {
     // comes online.
     if (HCEnabled()) {
         GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
-        StartHealthCheck(id(),
-            GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+        StartHealthCheck(id(), 
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
     }
     // Wake up all threads waiting on EPOLLOUT when closing fd
     _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index 66d1fbad..86bee891 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -176,6 +176,16 @@ class MyEchoService : public ::test::EchoService {
             res->add_code_list(req->code());
         }
         res->set_receiving_socket_id(cntl->_current_call.sending_sock->id());
+
+        brpc::ProtocolType protocol = cntl->request_protocol();
+        if ((brpc::PROTOCOL_HTTP == protocol || brpc::PROTOCOL_H2 == protocol) 
&&
+            !req->http_header().empty()) {
+            ASSERT_FALSE(req->http_header().empty());
+            const std::string* val = 
cntl->http_request().GetHeader(req->http_header());
+            ASSERT_TRUE(val);
+            ASSERT_FALSE(val->empty());
+            cntl->http_response().SetHeader(req->http_header(), *val);
+        }
     }
     static void CallAfterRpc(std::shared_ptr<CallAfterRpcObject> str,
                         brpc::Controller* cntl,
@@ -310,8 +320,10 @@ protected:
                       bool short_connection,
                       const brpc::Authenticator* auth = NULL,
                       std::string connection_group = std::string(),
-                      bool use_backup_request_policy = false) {
+                      bool use_backup_request_policy = false,
+                      brpc::ProtocolType protocol = brpc::PROTOCOL_BAIDU_STD) {
         brpc::ChannelOptions opt;
+        opt.protocol = protocol;
         if (short_connection) {
             opt.connection_type = brpc::CONNECTION_TYPE_SHORT;
         }
@@ -526,7 +538,7 @@ protected:
             int channel_index,
             const google::protobuf::MethodDescriptor* method,
             const google::protobuf::Message* req_base,
-            google::protobuf::Message* response) {
+            google::protobuf::Message* response) override {
             test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
             req->set_code(channel_index + 1/*non-zero*/);
             return brpc::SubCall(method, req, response->New(),
@@ -540,7 +552,7 @@ protected:
             int channel_index,
             const google::protobuf::MethodDescriptor* method,
             const google::protobuf::Message* req_base,
-            google::protobuf::Message* response) {
+            google::protobuf::Message* response) override {
             if (channel_index % 2) {
                 return brpc::SubCall::Skip();
             }
@@ -554,7 +566,7 @@ protected:
             int channel_index,
             const google::protobuf::MethodDescriptor* method,
             const google::protobuf::Message* req_base,
-            google::protobuf::Message* res_base) {
+            google::protobuf::Message* res_base) override {
             const test::ComboRequest* req =
                 dynamic_cast<const test::ComboRequest*>(req_base);
             test::ComboResponse* res = 
dynamic_cast<test::ComboResponse*>(res_base);
@@ -1334,7 +1346,7 @@ protected:
             int /*channel_index*/,
             const google::protobuf::MethodDescriptor* method,
             const google::protobuf::Message* req_base,
-            google::protobuf::Message* response) {
+            google::protobuf::Message* response) override {
             test::EchoRequest* req = brpc::Clone<test::EchoRequest>(req_base);
             req->set_sleep_us(70000); // 70ms
             return brpc::SubCall(method, req, response->New(),
@@ -2357,7 +2369,7 @@ class BadCall : public brpc::CallMapper {
     brpc::SubCall Map(int,
                      const google::protobuf::MethodDescriptor*,
                      const google::protobuf::Message*,
-                     google::protobuf::Message*) {
+                     google::protobuf::Message*) override {
         return brpc::SubCall::Bad();
     }
 };
@@ -2384,7 +2396,7 @@ class SkipCall : public brpc::CallMapper {
     brpc::SubCall Map(int,
                      const google::protobuf::MethodDescriptor*,
                      const google::protobuf::Message*,
-                     google::protobuf::Message*) {
+                     google::protobuf::Message*) override {
         return brpc::SubCall::Skip();
     }
 };
@@ -2412,6 +2424,55 @@ TEST_F(ChannelTest, skip_all_channels) {
     }
 }
 
+static const std::string ECHO_HTTP_HEADER = "echo-http-header";
+
+class EchoHttpHeader : public brpc::CallMapper {
+public:
+    brpc::SubCall Map(int channel_index, int channel_count,
+                      const google::protobuf::MethodDescriptor* method,
+                      const google::protobuf::Message* request,
+                      google::protobuf::Message* response) override {
+        return brpc::SubCall(method, request, response->New(), 
brpc::DELETE_RESPONSE);
+    }
+
+    void MapController(int channel_index, int,
+                       const brpc::Controller* main_cntl,
+                       brpc::Controller* sub_cntl) override {
+        sub_cntl->http_request().SetHeader(ECHO_HTTP_HEADER, 
std::to_string(channel_index));
+    }
+};
+
+TEST_F(ChannelTest, http_header_parallel_channels) {
+    brpc::Server server;
+    MyEchoService service;
+    ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
+    brpc::ServerOptions opt;
+    ASSERT_EQ(0, server.Start(_ep, &opt));
+
+    const size_t NCHANS = 5;
+    brpc::ParallelChannel channel;
+    for (size_t i = 0; i < NCHANS; ++i) {
+        brpc::Channel* sub_chan = new brpc::Channel();
+        SetUpChannel(sub_chan, true, false, NULL, "", false, 
brpc::PROTOCOL_HTTP);
+        ASSERT_EQ(0, channel.AddChannel(sub_chan, brpc::OWNS_CHANNEL, new 
EchoHttpHeader, NULL));
+    }
+
+    brpc::Controller cntl;
+    test::EchoRequest req;
+    test::EchoResponse res;
+    req.set_message(__FUNCTION__);
+    *req.mutable_http_header() = ECHO_HTTP_HEADER;
+    CallMethod(&channel, &cntl, &req, &res, false);
+
+    ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
+    ASSERT_EQ((int)NCHANS, cntl.sub_count());
+    for (int i = 0; i < cntl.sub_count(); ++i) {
+        const brpc::Controller* sub_cntl = cntl.sub(i);
+        ASSERT_TRUE(NULL != sub_cntl) << "i=" << i;
+        ASSERT_EQ(std::to_string(i), 
*sub_cntl->http_response().GetHeader(ECHO_HTTP_HEADER));
+    }
+}
+
 TEST_F(ChannelTest, connection_failed_parallel) {
     for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
         for (int j = 0; j <= 1; ++j) { // Flag Asynchronous
diff --git a/test/echo.proto b/test/echo.proto
index 970ef1db..c9fa8ace 100644
--- a/test/echo.proto
+++ b/test/echo.proto
@@ -27,6 +27,7 @@ message EchoRequest {
     optional bool close_fd = 3;
     optional int32 sleep_us = 4;
     optional int32 server_fail = 5;
+    optional string http_header = 6;
 };
 
 message EchoResponse {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to