This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new e1bf467b set tags workers unlimitedly (#2801)
e1bf467b is described below

commit e1bf467b20e1ca12023ed11b0aad567f6653a735
Author: Jade <zhengj...@qq.com>
AuthorDate: Thu Oct 31 10:13:43 2024 +0800

    set tags workers unlimitedly (#2801)
    
    * set tags workers unlimitedly
    
    * fix set concurrency test
    
    ---------
    
    Co-authored-by: jiazheng.jia <jiazheng....@antgroup.com>
---
 docs/cn/bthread_tagged_task_group.md     |  6 +++---
 example/bthread_tag_echo_c++/server.cpp  |  4 ++--
 src/brpc/server.cpp                      |  6 +-----
 src/bthread/bthread.cpp                  | 20 +++++++++++---------
 test/bthread_setconcurrency_unittest.cpp |  6 ++++--
 5 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/docs/cn/bthread_tagged_task_group.md 
b/docs/cn/bthread_tagged_task_group.md
index bfdafd76..027bd4eb 100644
--- a/docs/cn/bthread_tagged_task_group.md
+++ b/docs/cn/bthread_tagged_task_group.md
@@ -11,19 +11,19 @@
 
 ```c++
 服务端启动
-./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 
-bthread_min_concurrency 12 -event_dispatcher_num 1
+./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 
-bthread_min_concurrency 8 -event_dispatcher_num 1
 
 客户端启动
 ./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true
 ./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true
 ```
 
-FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
+FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。
 
一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。
 
 Q:如何动态改变分组线程的数量?
 
-A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。
+A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了 
bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server 
来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和 
FLAGS_bthread_concurrency_by_tag 
来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的 
worker 总数。
 
 Q:不同分组之间有什么关系吗?
 
diff --git a/example/bthread_tag_echo_c++/server.cpp 
b/example/bthread_tag_echo_c++/server.cpp
index bc717e25..ed4ba4d6 100644
--- a/example/bthread_tag_echo_c++/server.cpp
+++ b/example/bthread_tag_echo_c++/server.cpp
@@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server");
 DEFINE_int32(tag1, 0, "Server1 tag");
 DEFINE_int32(tag2, 1, "Server2 tag");
 DEFINE_int32(tag3, 2, "Background task tag");
-DEFINE_int32(num_threads1, 4, "Thread number of server1");
-DEFINE_int32(num_threads2, 4, "Thread number of server2");
+DEFINE_int32(num_threads1, 6, "Thread number of server1");
+DEFINE_int32(num_threads2, 16, "Thread number of server2");
 DEFINE_int32(idle_timeout_s, -1,
              "Connection will be closed if there is no "
              "read/write operations during the last `idle_timeout_s'");
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index fa3ab7d7..0110761a 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint& 
endpoint,
         if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
             _options.num_threads = BTHREAD_MIN_CONCURRENCY;
         }
-        if (original_bthread_tag == BTHREAD_TAG_INVALID) {
-            bthread_setconcurrency(_options.num_threads);
-        } else {
-            bthread_setconcurrency_by_tag(_options.num_threads, 
_options.bthread_tag);
-        }
+        bthread_setconcurrency_by_tag(_options.num_threads, 
_options.bthread_tag);
     }
 
     for (MethodMap::iterator it = _method_map.begin();
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index dcd29c43..f963c4a6 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t 
tag) {
     }
     auto c = bthread::get_or_new_task_control();
     BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
-    auto ngroup = c->concurrency();
     auto tag_ngroup = c->concurrency(tag);
     auto add = num - tag_ngroup;
-    if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
-        LOG(ERROR) << "Fail to set concurrency by tag " << tag
-                   << ", Total concurrency larger than bthread_concurrency";
-        return EPERM;
-    }
-    auto added = 0;
+
     if (add > 0) {
-        added = c->add_workers(add, tag);
+        auto added = c->add_workers(add, tag);
+        bthread::FLAGS_bthread_concurrency += added;
         return (add == added ? 0 : EPERM);
+
+    } else if (add < 0){
+        LOG(WARNING) << "Fail to set concurrency by tag: " << tag
+                     << ", tag concurrency must larger than old oncurrency. 
old concurrency: "
+                     << tag_ngroup << ", new concurrency: " << num;
+        return EPERM;
+    } else {
+        return 0;
     }
-    return (num == tag_ngroup ? 0 : EPERM);
 }
 
 int bthread_about_to_quit() {
diff --git a/test/bthread_setconcurrency_unittest.cpp 
b/test/bthread_setconcurrency_unittest.cpp
index 7c8faf40..aa1d674c 100644
--- a/test/bthread_setconcurrency_unittest.cpp
+++ b/test/bthread_setconcurrency_unittest.cpp
@@ -214,9 +214,11 @@ int concurrency_by_tag(int num) {
 
 TEST(BthreadTest, concurrency_by_tag) {
     ASSERT_EQ(concurrency_by_tag(1), false);
-    auto con = bthread_getconcurrency_by_tag(0);
+    auto tag_con = bthread_getconcurrency_by_tag(0);
+    auto con = bthread_getconcurrency();
     ASSERT_EQ(concurrency_by_tag(con), true);
-    ASSERT_EQ(concurrency_by_tag(con + 1), false);
+    ASSERT_EQ(concurrency_by_tag(con + 1), true);
+    ASSERT_EQ(bthread_getconcurrency(), con+1);
     bthread_setconcurrency(con + 1);
     ASSERT_EQ(concurrency_by_tag(con + 1), true);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to