This is an automated email from the ASF dual-hosted git repository. wwbmmm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new e1bf467b set tags workers unlimitedly (#2801) e1bf467b is described below commit e1bf467b20e1ca12023ed11b0aad567f6653a735 Author: Jade <zhengj...@qq.com> AuthorDate: Thu Oct 31 10:13:43 2024 +0800 set tags workers unlimitedly (#2801) * set tags workers unlimitedly * fix set concurrency test --------- Co-authored-by: jiazheng.jia <jiazheng....@antgroup.com> --- docs/cn/bthread_tagged_task_group.md | 6 +++--- example/bthread_tag_echo_c++/server.cpp | 4 ++-- src/brpc/server.cpp | 6 +----- src/bthread/bthread.cpp | 20 +++++++++++--------- test/bthread_setconcurrency_unittest.cpp | 6 ++++-- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/cn/bthread_tagged_task_group.md b/docs/cn/bthread_tagged_task_group.md index bfdafd76..027bd4eb 100644 --- a/docs/cn/bthread_tagged_task_group.md +++ b/docs/cn/bthread_tagged_task_group.md @@ -11,19 +11,19 @@ ```c++ 服务端启动 -./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 12 -event_dispatcher_num 1 +./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20 -bthread_min_concurrency 8 -event_dispatcher_num 1 客户端启动 ./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true ./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true ``` -FLAGS_bthread_concurrency为所有分组的线程数的上限,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。 +FLAGS_bthread_concurrency为所有线程的数,FLAGS_bthread_min_concurrency为所有分组的线程数的下限,FLAGS_event_dispatcher_num为单个分组中事件驱动器的数量。FLAGS_bthread_current_tag为将要修改的分组的tag值,FLAGS_bthread_concurrency_by_tag设置这个分组的线程数。 一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。 Q:如何动态改变分组线程的数量? -A:server的线程数最少为4个,后台任务线程数最少为2个,所以上面的例子中,FLAGS_bthread_concurrency最小值为4+4+2=10,再设置FLAGS_bthread_min_concurrency=FLAGS_bthread_concurrency,之后再把FLAGS_bthread_concurrency改大一些,之后再设置FLAGS_bthread_current_tag和FLAGS_bthread_concurrency_by_tag来改变某个分组的线程数。对于server,如果设置了ServerOption.bthread_tag,num_threads的含义是这个分组的线程数;如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_thread的含义是所有分组的线程数。 +A:你可以根据你的服务更自由的设计你的每个分组的线程数,启动的时候会根据你设置的 bthread_concurrency 来初始化线程池,如果你设置了 bthread_min_concurrency,那么会根据 bthread_min_concurrency 来设置线程池,对于 server 来说,num_threads 就是该 tag 对应的 worker 数量。可以通过设置 FLAGS_bthread_current_tag 和 FLAGS_bthread_concurrency_by_tag 来改变某个分组的线程数。如果没有设置(相当于没有启用分组,默认值为BTHREAD_TAG_INVALID),num_threads的含义是所有分组的 worker 总数。 Q:不同分组之间有什么关系吗? diff --git a/example/bthread_tag_echo_c++/server.cpp b/example/bthread_tag_echo_c++/server.cpp index bc717e25..ed4ba4d6 100644 --- a/example/bthread_tag_echo_c++/server.cpp +++ b/example/bthread_tag_echo_c++/server.cpp @@ -29,8 +29,8 @@ DEFINE_int32(port2, 8003, "TCP Port of this server"); DEFINE_int32(tag1, 0, "Server1 tag"); DEFINE_int32(tag2, 1, "Server2 tag"); DEFINE_int32(tag3, 2, "Background task tag"); -DEFINE_int32(num_threads1, 4, "Thread number of server1"); -DEFINE_int32(num_threads2, 4, "Thread number of server2"); +DEFINE_int32(num_threads1, 6, "Thread number of server1"); +DEFINE_int32(num_threads2, 16, "Thread number of server2"); DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no " "read/write operations during the last `idle_timeout_s'"); diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index fa3ab7d7..0110761a 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -1044,11 +1044,7 @@ int Server::StartInternal(const butil::EndPoint& endpoint, if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) { _options.num_threads = BTHREAD_MIN_CONCURRENCY; } - if (original_bthread_tag == BTHREAD_TAG_INVALID) { - bthread_setconcurrency(_options.num_threads); - } else { - bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); - } + bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag); } for (MethodMap::iterator it = _method_map.begin(); diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp index dcd29c43..f963c4a6 100644 --- a/src/bthread/bthread.cpp +++ b/src/bthread/bthread.cpp @@ -397,20 +397,22 @@ int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) { } auto c = bthread::get_or_new_task_control(); BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex); - auto ngroup = c->concurrency(); auto tag_ngroup = c->concurrency(tag); auto add = num - tag_ngroup; - if (ngroup + add > bthread::FLAGS_bthread_concurrency) { - LOG(ERROR) << "Fail to set concurrency by tag " << tag - << ", Total concurrency larger than bthread_concurrency"; - return EPERM; - } - auto added = 0; + if (add > 0) { - added = c->add_workers(add, tag); + auto added = c->add_workers(add, tag); + bthread::FLAGS_bthread_concurrency += added; return (add == added ? 0 : EPERM); + + } else if (add < 0){ + LOG(WARNING) << "Fail to set concurrency by tag: " << tag + << ", tag concurrency must larger than old oncurrency. old concurrency: " + << tag_ngroup << ", new concurrency: " << num; + return EPERM; + } else { + return 0; } - return (num == tag_ngroup ? 0 : EPERM); } int bthread_about_to_quit() { diff --git a/test/bthread_setconcurrency_unittest.cpp b/test/bthread_setconcurrency_unittest.cpp index 7c8faf40..aa1d674c 100644 --- a/test/bthread_setconcurrency_unittest.cpp +++ b/test/bthread_setconcurrency_unittest.cpp @@ -214,9 +214,11 @@ int concurrency_by_tag(int num) { TEST(BthreadTest, concurrency_by_tag) { ASSERT_EQ(concurrency_by_tag(1), false); - auto con = bthread_getconcurrency_by_tag(0); + auto tag_con = bthread_getconcurrency_by_tag(0); + auto con = bthread_getconcurrency(); ASSERT_EQ(concurrency_by_tag(con), true); - ASSERT_EQ(concurrency_by_tag(con + 1), false); + ASSERT_EQ(concurrency_by_tag(con + 1), true); + ASSERT_EQ(bthread_getconcurrency(), con+1); bthread_setconcurrency(con + 1); ASSERT_EQ(concurrency_by_tag(con + 1), true); } --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org