wwbmmm commented on code in PR #3163:
URL: https://github.com/apache/brpc/pull/3163#discussion_r2558902604


##########
src/brpc/controller.h:
##########
@@ -323,7 +325,19 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
 
     // Make the RPC end when the HTTP response has complete headers and let
     // user read the remaining body by using ReadProgressiveAttachmentBy().
-    void response_will_be_read_progressively() { 
add_flag(FLAGS_READ_PROGRESSIVELY); }
+    void response_will_be_read_progressively() { 
+        if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 
0) {
+            return;
+        }
+        bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
+        tmp.tag = _bthread_tag;
+        if(bthread_start_background(&_progressive_read_idle_tid, &tmp, 
HandleIdleProgressiveReader, this) != 0){
+            LOG(FATAL) << "Failed to start controller bthread id : " << 
_progressive_read_idle_tid;
+        }
+        LOG(INFO) << "Start Response progressive reader idle checker close 
idle_tid : " << _progressive_read_idle_tid

Review Comment:
   This is like DEBUG log. It should not be INFO level.



##########
src/brpc/controller.h:
##########
@@ -837,6 +851,11 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
     int32_t _timeout_ms;
     int32_t _connect_timeout_ms;
     int32_t _backup_request_ms;
+    int32_t _progressive_read_timeout_ms;
+    butil::FlatSet<SocketId> _checking_progressive_read_fds;
+    bthread_t _progressive_read_idle_tid;

Review Comment:
   This bthread is never joined, it will cause memory leak.



##########
src/brpc/controller.cpp:
##########
@@ -1179,6 +1226,11 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
         // Tag the socket so that when the response comes back, the parser will
         // stop before reading all body.
         _current_call.sending_sock->read_will_be_progressive(_connection_type);
+        auto socket_id = _current_call.sending_sock->id();
+        if (_progressive_read_timeout_ms > 0 && 
_checking_progressive_read_fds.seek(socket_id) == NULL) {
+            _checking_progressive_read_fds.insert(socket_id);
+            LOG(INFO) << "insert the progressive read fd : " << socket_id << " 
socket fds size : " << _checking_progressive_read_fds.size();

Review Comment:
   This is like DEBUG log. It should not be INFO level.



##########
src/brpc/controller.h:
##########
@@ -323,7 +325,19 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
 
     // Make the RPC end when the HTTP response has complete headers and let
     // user read the remaining body by using ReadProgressiveAttachmentBy().
-    void response_will_be_read_progressively() { 
add_flag(FLAGS_READ_PROGRESSIVELY); }
+    void response_will_be_read_progressively() { 
+        if(has_flag(FLAGS_READ_PROGRESSIVELY) && _progressive_read_idle_tid > 
0) {
+            return;
+        }
+        bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
+        tmp.tag = _bthread_tag;
+        if(bthread_start_background(&_progressive_read_idle_tid, &tmp, 
HandleIdleProgressiveReader, this) != 0){

Review Comment:
   Only when user set progressive_read_timeout_ms, you need to start a bthread 
here.



##########
src/brpc/controller.cpp:
##########
@@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
     _span = NULL;
 }
 
+void* Controller::HandleIdleProgressiveReader(void* arg) {
+    auto* cntl = static_cast<Controller*>(arg);
+    const uint64_t CHECK_INTERVAL_US = 1000000UL;
+    auto log_idle = FLAGS_log_idle_progressive_read_close;
+    std::vector<SocketId> remove_socket_ids;
+    while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
+        // TODO: this is not efficient for a lot of connections(>100K)

Review Comment:
   In what scenarios will a controller have >100K connections?



##########
src/brpc/controller.h:
##########
@@ -837,6 +851,11 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
     int32_t _timeout_ms;
     int32_t _connect_timeout_ms;
     int32_t _backup_request_ms;
+    int32_t _progressive_read_timeout_ms;
+    butil::FlatSet<SocketId> _checking_progressive_read_fds;
+    bthread_t _progressive_read_idle_tid;
+    // Controller belongs to this tag
+    bthread_tag_t _bthread_tag = bthread_self_tag();

Review Comment:
   It is not suitable to init bthread_tag here, the bthread_self_tag() will be 
execute when the Controller create, but that thread may not be the thread which 
issues rpc.



##########
src/brpc/controller.h:
##########
@@ -837,6 +851,11 @@ friend void 
policy::ProcessThriftRequest(InputMessageBase*);
     int32_t _timeout_ms;
     int32_t _connect_timeout_ms;
     int32_t _backup_request_ms;
+    int32_t _progressive_read_timeout_ms;
+    butil::FlatSet<SocketId> _checking_progressive_read_fds;

Review Comment:
   Maybe you don't need to add this FlatSet, you can access the Controller's 
_current_call and _unfinished_call to find existing connections. I think there 
are at most 2 connections in one Controller.



##########
src/brpc/controller.cpp:
##########
@@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
     _span = NULL;
 }
 
+void* Controller::HandleIdleProgressiveReader(void* arg) {
+    auto* cntl = static_cast<Controller*>(arg);
+    const uint64_t CHECK_INTERVAL_US = 1000000UL;

Review Comment:
   It seems that the check interval is 1s, but the resolution of 
progressive_read_timeout is 1ms



##########
src/brpc/controller.cpp:
##########
@@ -1027,6 +1037,43 @@ void Controller::SubmitSpan() {
     _span = NULL;
 }
 
+void* Controller::HandleIdleProgressiveReader(void* arg) {
+    auto* cntl = static_cast<Controller*>(arg);
+    const uint64_t CHECK_INTERVAL_US = 1000000UL;
+    auto log_idle = FLAGS_log_idle_progressive_read_close;
+    std::vector<SocketId> remove_socket_ids;
+    while (bthread_usleep(CHECK_INTERVAL_US) == 0) {
+        // TODO: this is not efficient for a lot of connections(>100K)
+        auto socketIds = cntl->_checking_progressive_read_fds;

Review Comment:
   Do you need to lock the Controller when you access 
`cntl->_checking_progressive_read_fds`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to