This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 73b307a7 bthread task group add tag (#2358)
73b307a7 is described below
commit 73b307a768cfbc369ffb9c359b11f361d980ef58
Author: Yang,Liming <[email protected]>
AuthorDate: Wed Dec 20 11:47:26 2023 +0800
bthread task group add tag (#2358)
Co-authored-by: Yang Liming <[email protected]>
---
docs/cn/bthread_tagged_task_group.md | 30 ++++
docs/images/bthread_tagged_increment_all.png | Bin 0 -> 179089 bytes
docs/images/bthread_tagged_increment_tag1.png | Bin 0 -> 178168 bytes
docs/images/bthread_tagged_worker_usage.png | Bin 0 -> 190384 bytes
example/bthread_tag_echo_c++/CMakeLists.txt | 145 ++++++++++++++++++
example/bthread_tag_echo_c++/client.cpp | 154 ++++++++++++++++++++
example/bthread_tag_echo_c++/echo.proto | 33 +++++
example/bthread_tag_echo_c++/server.cpp | 146 +++++++++++++++++++
src/brpc/acceptor.cpp | 5 +-
src/brpc/acceptor.h | 3 +
src/brpc/event_dispatcher.cpp | 29 ++--
src/brpc/event_dispatcher.h | 2 +-
src/brpc/input_messenger.cpp | 1 +
src/brpc/server.cpp | 12 +-
src/brpc/server.h | 4 +
src/brpc/socket.cpp | 21 ++-
src/brpc/socket.h | 3 +
src/brpc/socket_inl.h | 1 +
src/bthread/bthread.cpp | 130 +++++++++++++++--
src/bthread/bthread.h | 9 ++
src/bthread/butex.cpp | 11 +-
src/bthread/task_control.cpp | 202 ++++++++++++++++++--------
src/bthread/task_control.h | 70 +++++++--
src/bthread/task_group.cpp | 19 ++-
src/bthread/task_group.h | 8 +
src/bthread/types.h | 26 ++--
src/bthread/unstable.h | 3 +
test/bthread_setconcurrency_unittest.cpp | 31 ++++
28 files changed, 968 insertions(+), 130 deletions(-)
diff --git a/docs/cn/bthread_tagged_task_group.md
b/docs/cn/bthread_tagged_task_group.md
new file mode 100644
index 00000000..13b8fb66
--- /dev/null
+++ b/docs/cn/bthread_tagged_task_group.md
@@ -0,0 +1,30 @@
+
+# Bthread tagged task group
+
+在很多应用开发过程中都会有线程资源隔离的需求,比如服务分为控制层和数据层,数据层的请求压力大,不希望控制层受到影响;再比如,服务有多个磁盘,希望服务不同磁盘的线程之间没有什么影响;bthread的为任务组打标签就是实现bthread的worker线程池按照tag分组,让不同分组之间达到没有互相影响的目的。服务是按照server级别做tag分组的,用户需要将不同分组的service安排到不同server上,不同server将使用不同端口。还有些场景服务需要有一些后台任务或者定时任务在单独的线程池中调度,这些任务没有service,这种情况也可以使用tag分组专门划分一个线程池,让这些任务在这个tag分组上执行。后续在这个基础上还可以实现多种策略,比如,将tag组限制在NUMA的某个组,组内线程绑核等。
+
+# 使用方式
+
+在example/bthread_tag_echo_c++里面有一个实例代码,分别启动服务端和客户端,服务端将worker划分成3个tag(分组),例子里面可以设置FLAGS_tag1,FLAGS_tag2,给不同server打标签。剩下的一个tag(分组)给服务的后台任务使用。
+
+```c++
+服务端启动
+./echo_server -task_group_ntags 3 -tag1 0 -tag2 1 -bthread_concurrency 20
-bthread_min_concurrency 12 -event_dispatcher_num 2
+
+客户端启动
+./echo_client -dummy_port 8888 -server "0.0.0.0:8002" -use_bthread true
+./echo_client -dummy_port 8889 -server "0.0.0.0:8003" -use_bthread true
+```
+
+一般情况应用创建的bthread不需要设置bthread_attr_t的tag字段,创建的bthread会在当前tag上下文中执行;如果希望创建的bthread不在当前tag上下文中执行,可以设置bthread_attr_t的tag字段为希望的值,这么做会对性能有些损失,关键路径上应该避免这么做。
+
+# 监控
+
+目前监控上按照tag划分的指标有,线程的数量、线程的使用量、bthread_count、连接信息
+
+线程使用量:
+
+动态调整线程数,FLAGS_bthread_concurrency是所有线程池的线程数总和,设置FLAGS_bthread_concurrency会依次为每个线程池增加线程数量,直到线程总数为FLAGS_bthread_concurrency。要设置某个线程池的数量,先设置FLAGS_bthread_current_tag为要调整的tag,之后再设置FLAGS_bthread_concurrency_by_tag为指定的线程数量,但是所有线程池的总数量不能超过FLAGS_bthread_concurrency。
+
+设置tag1:
+设置所有tag:
diff --git a/docs/images/bthread_tagged_increment_all.png
b/docs/images/bthread_tagged_increment_all.png
new file mode 100644
index 00000000..686b9d28
Binary files /dev/null and b/docs/images/bthread_tagged_increment_all.png differ
diff --git a/docs/images/bthread_tagged_increment_tag1.png
b/docs/images/bthread_tagged_increment_tag1.png
new file mode 100644
index 00000000..291ad80e
Binary files /dev/null and b/docs/images/bthread_tagged_increment_tag1.png
differ
diff --git a/docs/images/bthread_tagged_worker_usage.png
b/docs/images/bthread_tagged_worker_usage.png
new file mode 100644
index 00000000..748cfcda
Binary files /dev/null and b/docs/images/bthread_tagged_worker_usage.png differ
diff --git a/example/bthread_tag_echo_c++/CMakeLists.txt
b/example/bthread_tag_echo_c++/CMakeLists.txt
new file mode 100644
index 00000000..2baea53e
--- /dev/null
+++ b/example/bthread_tag_echo_c++/CMakeLists.txt
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8.10)
+project(bthread_tag_echo_c++ C CXX)
+
+option(LINK_SO "Whether examples are linked dynamically" OFF)
+
+execute_process(
+ COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex
\".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'"
+ OUTPUT_VARIABLE OUTPUT_PATH
+)
+
+set(CMAKE_PREFIX_PATH ${OUTPUT_PATH})
+
+include(FindThreads)
+include(FindProtobuf)
+protobuf_generate_cpp(PROTO_SRC PROTO_HEADER echo.proto)
+# include PROTO_HEADER
+include_directories(${CMAKE_CURRENT_BINARY_DIR})
+
+# Search for libthrift* by best effort. If it is not found and brpc is
+# compiled with thrift protocol enabled, a link error would be reported.
+find_library(THRIFT_LIB NAMES thrift)
+if (NOT THRIFT_LIB)
+ set(THRIFT_LIB "")
+endif()
+
+find_path(GPERFTOOLS_INCLUDE_DIR NAMES gperftools/heap-profiler.h)
+find_library(GPERFTOOLS_LIBRARIES NAMES tcmalloc_and_profiler)
+include_directories(${GPERFTOOLS_INCLUDE_DIR})
+
+find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h)
+if(LINK_SO)
+ find_library(BRPC_LIB NAMES brpc)
+else()
+ find_library(BRPC_LIB NAMES libbrpc.a brpc)
+endif()
+if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB))
+ message(FATAL_ERROR "Fail to find brpc")
+endif()
+include_directories(${BRPC_INCLUDE_PATH})
+
+find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h)
+find_library(GFLAGS_LIBRARY NAMES gflags libgflags)
+if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY))
+ message(FATAL_ERROR "Fail to find gflags")
+endif()
+include_directories(${GFLAGS_INCLUDE_PATH})
+
+execute_process(
+ COMMAND bash -c "grep \"namespace [_A-Za-z0-9]\\+ {\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $2}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+)
+if(${GFLAGS_NS} STREQUAL "GFLAGS_NAMESPACE")
+ execute_process(
+ COMMAND bash -c "grep \"#define GFLAGS_NAMESPACE [_A-Za-z0-9]\\+\"
${GFLAGS_INCLUDE_PATH}/gflags/gflags_declare.h | head -1 | awk '{print $3}' |
tr -d '\n'"
+ OUTPUT_VARIABLE GFLAGS_NS
+ )
+endif()
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ include(CheckFunctionExists)
+ CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME)
+ if(NOT HAVE_CLOCK_GETTIME)
+ set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC")
+ endif()
+endif()
+
+set(CMAKE_CPP_FLAGS "${DEFINE_CLOCK_GETTIME} -DGFLAGS_NS=${GFLAGS_NS}")
+set(CMAKE_CXX_FLAGS "${CMAKE_CPP_FLAGS} -DNDEBUG -O2 -D__const__=__unused__
-pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer")
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DBRPC_ENABLE_CPU_PROFILER")
+
+if(CMAKE_VERSION VERSION_LESS "3.1.3")
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+ if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
+ endif()
+else()
+ set(CMAKE_CXX_STANDARD 11)
+ set(CMAKE_CXX_STANDARD_REQUIRED ON)
+endif()
+
+find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h)
+find_library(LEVELDB_LIB NAMES leveldb)
+if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB))
+ message(FATAL_ERROR "Fail to find leveldb")
+endif()
+include_directories(${LEVELDB_INCLUDE_PATH})
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(OPENSSL_ROOT_DIR
+ "/usr/local/opt/openssl" # Homebrew installed OpenSSL
+ )
+endif()
+
+find_package(OpenSSL)
+include_directories(${OPENSSL_INCLUDE_DIR})
+
+
+set(DYNAMIC_LIB
+ ${CMAKE_THREAD_LIBS_INIT}
+ ${GFLAGS_LIBRARY}
+ ${PROTOBUF_LIBRARIES}
+ ${LEVELDB_LIB}
+ ${OPENSSL_CRYPTO_LIBRARY}
+ ${OPENSSL_SSL_LIBRARY}
+ ${THRIFT_LIB}
+ dl
+ )
+
+if(CMAKE_SYSTEM_NAME STREQUAL "Darwin")
+ set(DYNAMIC_LIB ${DYNAMIC_LIB}
+ pthread
+ "-framework CoreFoundation"
+ "-framework CoreGraphics"
+ "-framework CoreData"
+ "-framework CoreText"
+ "-framework Security"
+ "-framework Foundation"
+ "-Wl,-U,_MallocExtension_ReleaseFreeMemory"
+ "-Wl,-U,_ProfilerStart"
+ "-Wl,-U,_ProfilerStop")
+endif()
+
+add_executable(echo_client client.cpp ${PROTO_SRC} ${PROTO_HEADER})
+add_executable(echo_server server.cpp ${PROTO_SRC} ${PROTO_HEADER})
+
+target_link_libraries(echo_client ${BRPC_LIB} ${DYNAMIC_LIB}
${GPERFTOOLS_LIBRARIES})
+target_link_libraries(echo_server ${BRPC_LIB} ${DYNAMIC_LIB}
${GPERFTOOLS_LIBRARIES})
diff --git a/example/bthread_tag_echo_c++/client.cpp
b/example/bthread_tag_echo_c++/client.cpp
new file mode 100644
index 00000000..2af1a8a6
--- /dev/null
+++ b/example/bthread_tag_echo_c++/client.cpp
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A client sending requests to server by multiple threads.
+
+#include <gflags/gflags.h>
+#include <bthread/bthread.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <brpc/channel.h>
+#include "echo.pb.h"
+#include <bvar/bvar.h>
+
+DEFINE_int32(thread_num, 50, "Number of threads to send requests");
+DEFINE_bool(use_bthread, false, "Use bthread to send requests");
+DEFINE_int32(attachment_size, 0, "Carry so many byte attachment along with
requests");
+DEFINE_int32(request_size, 16, "Bytes of each request");
+DEFINE_string(protocol, "baidu_std", "Protocol type. Defined in
src/brpc/options.proto");
+DEFINE_string(connection_type, "", "Connection type. Available values: single,
pooled, short");
+DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
+DEFINE_string(load_balancer, "", "The algorithm for load balancing");
+DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
+DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
+DEFINE_bool(dont_fail, false, "Print fatal when some call failed");
+DEFINE_int32(dummy_port, -1, "Launch dummy server at this port");
+
+std::string g_request;
+std::string g_attachment;
+
+bvar::LatencyRecorder g_latency_recorder("client");
+bvar::Adder<int> g_error_count("client_error_count");
+
+static void* sender(void* arg) {
+ // Normally, you should not call a Channel directly, but instead construct
+ // a stub Service wrapping it. stub can be shared by all threads as well.
+ example::EchoService_Stub
stub(static_cast<google::protobuf::RpcChannel*>(arg));
+
+ int log_id = 0;
+ while (!brpc::IsAskedToQuit()) {
+ // We will receive response synchronously, safe to put variables
+ // on stack.
+ example::EchoRequest request;
+ example::EchoResponse response;
+ brpc::Controller cntl;
+
+ request.set_message(g_request);
+ cntl.set_log_id(log_id++); // set by user
+ // Set attachment which is wired to network directly instead of
+ // being serialized into protobuf messages.
+ cntl.request_attachment().append(g_attachment);
+
+ // Because `done'(last parameter) is NULL, this function waits until
+ // the response comes back or error occurs(including timedout).
+ stub.Echo(&cntl, &request, &response, NULL);
+ if (!cntl.Failed()) {
+ g_latency_recorder << cntl.latency_us();
+ } else {
+ g_error_count << 1;
+ CHECK(brpc::IsAskedToQuit() || !FLAGS_dont_fail)
+ << "error=" << cntl.ErrorText() << " latency=" <<
cntl.latency_us();
+ // We can't connect to the server, sleep a while. Notice that this
+ // is a specific sleeping to prevent this thread from spinning too
+ // fast. You should continue the business logic in a production
+ // server rather than sleeping.
+ bthread_usleep(50000);
+ }
+ }
+ return NULL;
+}
+
+int main(int argc, char* argv[]) {
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ // A Channel represents a communication line to a Server. Notice that
+ // Channel is thread-safe and can be shared by all threads in your program.
+ brpc::Channel channel;
+
+ // Initialize the channel, NULL means using default options.
+ brpc::ChannelOptions options;
+ options.protocol = FLAGS_protocol;
+ options.connection_type = FLAGS_connection_type;
+ options.connect_timeout_ms = std::min(FLAGS_timeout_ms / 2, 100);
+ options.timeout_ms = FLAGS_timeout_ms;
+ options.max_retry = FLAGS_max_retry;
+ if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ return -1;
+ }
+
+ if (FLAGS_attachment_size > 0) {
+ g_attachment.resize(FLAGS_attachment_size, 'a');
+ }
+ if (FLAGS_request_size <= 0) {
+ LOG(ERROR) << "Bad request_size=" << FLAGS_request_size;
+ return -1;
+ }
+ g_request.resize(FLAGS_request_size, 'r');
+
+ if (FLAGS_dummy_port >= 0) {
+ brpc::StartDummyServerAt(FLAGS_dummy_port);
+ }
+
+ std::vector<bthread_t> bids;
+ std::vector<pthread_t> pids;
+ if (!FLAGS_use_bthread) {
+ pids.resize(FLAGS_thread_num);
+ for (int i = 0; i < FLAGS_thread_num; ++i) {
+ if (pthread_create(&pids[i], NULL, sender, &channel) != 0) {
+ LOG(ERROR) << "Fail to create pthread";
+ return -1;
+ }
+ }
+ } else {
+ bids.resize(FLAGS_thread_num);
+ for (int i = 0; i < FLAGS_thread_num; ++i) {
+ if (bthread_start_background(&bids[i], nullptr, sender, &channel)
!= 0) {
+ LOG(ERROR) << "Fail to create bthread";
+ return -1;
+ }
+ }
+ }
+
+ while (!brpc::IsAskedToQuit()) {
+ sleep(1);
+ LOG(INFO) << "Sending EchoRequest at qps=" << g_latency_recorder.qps(1)
+ << " latency=" << g_latency_recorder.latency(1);
+ }
+
+ LOG(INFO) << "EchoClient is going to quit";
+ for (int i = 0; i < FLAGS_thread_num; ++i) {
+ if (!FLAGS_use_bthread) {
+ pthread_join(pids[i], NULL);
+ } else {
+ bthread_join(bids[i], NULL);
+ }
+ }
+
+ return 0;
+}
diff --git a/example/bthread_tag_echo_c++/echo.proto
b/example/bthread_tag_echo_c++/echo.proto
new file mode 100644
index 00000000..e963faf5
--- /dev/null
+++ b/example/bthread_tag_echo_c++/echo.proto
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+option cc_generic_services = true;
+
+package example;
+
+message EchoRequest {
+ required string message = 1;
+};
+
+message EchoResponse {
+ required string message = 1;
+};
+
+service EchoService {
+ rpc Echo(EchoRequest) returns (EchoResponse);
+};
diff --git a/example/bthread_tag_echo_c++/server.cpp
b/example/bthread_tag_echo_c++/server.cpp
new file mode 100644
index 00000000..4086c4a0
--- /dev/null
+++ b/example/bthread_tag_echo_c++/server.cpp
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include <bthread/unstable.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port1, 8002, "TCP Port of this server");
+DEFINE_int32(port2, 8003, "TCP Port of this server");
+DEFINE_int32(tag1, 0, "Server1 tag");
+DEFINE_int32(tag2, 1, "Server2 tag");
+DEFINE_int32(tag3, 2, "Background task tag");
+DEFINE_int32(idle_timeout_s, -1,
+ "Connection will be closed if there is no "
+ "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
+DEFINE_int32(internal_port1, -1, "Only allow builtin services at this port");
+DEFINE_int32(internal_port2, -1, "Only allow builtin services at this port");
+
+namespace example {
+// Your implementation of EchoService
+class EchoServiceImpl : public EchoService {
+public:
+ EchoServiceImpl() {}
+ ~EchoServiceImpl() {}
+ void Echo(google::protobuf::RpcController* cntl_base, const EchoRequest*
request,
+ EchoResponse* response, google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ // Echo request and its attachment
+ response->set_message(request->message());
+ if (FLAGS_echo_attachment) {
+ cntl->response_attachment().append(cntl->request_attachment());
+ }
+ }
+};
+} // namespace example
+
+DEFINE_bool(h, false, "print help information");
+
+static void my_tagged_worker_start_fn(bthread_tag_t tag) {
+ LOG(INFO) << "run tagged worker start function tag=" << tag;
+}
+
+static void* my_background_task(void*) {
+ LOG(INFO) << "run background task tag=" << bthread_self_tag();
+ bthread_usleep(1000000UL);
+}
+
+int main(int argc, char* argv[]) {
+ std::string help_str = "dummy help infomation";
+ GFLAGS_NS::SetUsageMessage(help_str);
+
+ // Parse gflags. We recommend you to use gflags as well.
+ GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+ if (FLAGS_h) {
+ fprintf(stderr, "%s\n%s\n%s", help_str.c_str(), help_str.c_str(),
help_str.c_str());
+ return 0;
+ }
+
+ // Set tagged worker function
+ bthread_set_tagged_worker_startfn(my_tagged_worker_start_fn);
+
+ // Generally you only need one Server.
+ brpc::Server server1;
+
+ // Instance of your service.
+ example::EchoServiceImpl echo_service_impl1;
+
+ // Add the service into server. Notice the second parameter, because the
+ // service is put on stack, we don't want server to delete it, otherwise
+ // use brpc::SERVER_OWNS_SERVICE.
+ if (server1.AddService(&echo_service_impl1,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+ LOG(ERROR) << "Fail to add service";
+ return -1;
+ }
+
+ // Start the server.
+ brpc::ServerOptions options1;
+ options1.idle_timeout_sec = FLAGS_idle_timeout_s;
+ options1.max_concurrency = FLAGS_max_concurrency;
+ options1.internal_port = FLAGS_internal_port1;
+ options1.bthread_tag = FLAGS_tag1;
+ if (server1.Start(FLAGS_port1, &options1) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Generally you only need one Server.
+ brpc::Server server2;
+
+ // Instance of your service.
+ example::EchoServiceImpl echo_service_impl2;
+
+ // Add the service into server. Notice the second parameter, because the
+ // service is put on stack, we don't want server to delete it, otherwise
+ // use brpc::SERVER_OWNS_SERVICE.
+ if (server2.AddService(&echo_service_impl2,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+ LOG(ERROR) << "Fail to add service";
+ return -1;
+ }
+
+ // Start the server.
+ brpc::ServerOptions options2;
+ options2.idle_timeout_sec = FLAGS_idle_timeout_s;
+ options2.max_concurrency = FLAGS_max_concurrency;
+ options2.internal_port = FLAGS_internal_port2;
+ options2.bthread_tag = FLAGS_tag2;
+ if (server2.Start(FLAGS_port2, &options2) != 0) {
+ LOG(ERROR) << "Fail to start EchoServer";
+ return -1;
+ }
+
+ // Start backgroup task
+ bthread_t tid;
+ bthread_attr_t attr = BTHREAD_ATTR_NORMAL;
+ attr.tag = FLAGS_tag3;
+ bthread_start_background(&tid, &attr, my_background_task, nullptr);
+
+ // Wait until Ctrl-C is pressed, then Stop() and Join() the server.
+ server1.RunUntilAskedToQuit();
+ server2.RunUntilAskedToQuit();
+
+ return 0;
+}
diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp
index f2d1c087..68d77082 100644
--- a/src/brpc/acceptor.cpp
+++ b/src/brpc/acceptor.cpp
@@ -40,7 +40,8 @@ Acceptor::Acceptor(bthread_keytable_pool_t* pool)
, _empty_cond(&_map_mutex)
, _force_ssl(false)
, _ssl_ctx(NULL)
- , _use_rdma(false) {
+ , _use_rdma(false)
+ , _bthread_tag(BTHREAD_TAG_DEFAULT) {
}
Acceptor::~Acceptor() {
@@ -90,6 +91,7 @@ int Acceptor::StartAccept(int listened_fd, int
idle_timeout_sec,
SocketOptions options;
options.fd = listened_fd;
options.user = this;
+ options.bthread_tag = _bthread_tag;
options.on_edge_triggered_events = OnNewConnections;
if (Socket::Create(options, &_acception_id) != 0) {
// Close-idle-socket thread will be stopped inside destructor
@@ -295,6 +297,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket*
acception) {
options.on_edge_triggered_events = InputMessenger::OnNewMessages;
}
options.use_rdma = am->_use_rdma;
+ options.bthread_tag = am->_bthread_tag;
if (Socket::Create(options, &socket_id) != 0) {
LOG(ERROR) << "Fail to create Socket";
continue;
diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h
index c82cdcc1..69f632aa 100644
--- a/src/brpc/acceptor.h
+++ b/src/brpc/acceptor.h
@@ -112,6 +112,9 @@ private:
// Whether to use rdma or not
bool _use_rdma;
+
+ // Acceptor belongs to this tag
+ bthread_tag_t _bthread_tag;
};
} // namespace brpc
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index e6209286..f747206a 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -25,6 +25,8 @@
#include "brpc/event_dispatcher.h"
#include "brpc/reloadable_flags.h"
+DECLARE_int32(task_group_ntags);
+
namespace brpc {
DEFINE_int32(event_dispatcher_num, 1, "Number of event dispatcher");
@@ -36,30 +38,35 @@ static EventDispatcher* g_edisp = NULL;
static pthread_once_t g_edisp_once = PTHREAD_ONCE_INIT;
static void StopAndJoinGlobalDispatchers() {
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- g_edisp[i].Stop();
- g_edisp[i].Join();
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
+ g_edisp[i * FLAGS_event_dispatcher_num + j].Stop();
+ g_edisp[i * FLAGS_event_dispatcher_num + j].Join();
+ }
}
}
void InitializeGlobalDispatchers() {
- g_edisp = new EventDispatcher[FLAGS_event_dispatcher_num];
- for (int i = 0; i < FLAGS_event_dispatcher_num; ++i) {
- const bthread_attr_t attr = FLAGS_usercode_in_pthread ?
- BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;
- CHECK_EQ(0, g_edisp[i].Start(&attr));
+ g_edisp = new EventDispatcher[FLAGS_task_group_ntags *
FLAGS_event_dispatcher_num];
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {
+ bthread_attr_t attr =
+ FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL;
+ attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;
+ CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num +
j].Start(&attr));
+ }
}
// This atexit is will be run before g_task_control.stop() because above
// Start() initializes g_task_control by creating bthread (to run
epoll/kqueue).
CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}
-EventDispatcher& GetGlobalEventDispatcher(int fd) {
+EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {
pthread_once(&g_edisp_once, InitializeGlobalDispatchers);
- if (FLAGS_event_dispatcher_num == 1) {
+ if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {
return g_edisp[0];
}
int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;
- return g_edisp[index];
+ return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}
} // namespace brpc
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 1f165cfc..d18c213e 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -99,7 +99,7 @@ private:
int _wakeup_fds[2];
};
-EventDispatcher& GetGlobalEventDispatcher(int fd);
+EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
} // namespace brpc
diff --git a/src/brpc/input_messenger.cpp b/src/brpc/input_messenger.cpp
index 43167d5b..e619af74 100644
--- a/src/brpc/input_messenger.cpp
+++ b/src/brpc/input_messenger.cpp
@@ -194,6 +194,7 @@ static void QueueMessage(InputMessageBase* to_run_msg,
BTHREAD_ATTR_PTHREAD :
BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
tmp.keytable_pool = keytable_pool;
+ tmp.tag = bthread_self_tag();
if (bthread_start_background(
&th, &tmp, ProcessInputMessage, to_run_msg) == 0) {
++*num_bthread_created;
diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp
index 99add519..ac8f29c9 100644
--- a/src/brpc/server.cpp
+++ b/src/brpc/server.cpp
@@ -87,6 +87,8 @@ extern "C" {
void* bthread_get_assigned_data();
}
+DECLARE_int32(task_group_ntags);
+
namespace brpc {
BAIDU_CASSERT(sizeof(int32_t) == sizeof(butil::subtle::Atomic32),
@@ -144,7 +146,8 @@ ServerOptions::ServerOptions()
, http_master_service(NULL)
, health_reporter(NULL)
, rtmp_service(NULL)
- , redis_service(NULL) {
+ , redis_service(NULL)
+ , bthread_tag(BTHREAD_TAG_DEFAULT) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
@@ -1071,6 +1074,13 @@ int Server::StartInternal(const butil::EndPoint&
endpoint,
return -1;
}
_am->_use_rdma = _options.use_rdma;
+ if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
+ _options.bthread_tag >= FLAGS_task_group_ntags) {
+ LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ",
tag range is ["
+ << BTHREAD_TAG_DEFAULT << ":" <<
FLAGS_task_group_ntags << ")";
+ return -1;
+ }
+ _am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
// to prevent requests being rejected as ELOGOFF
diff --git a/src/brpc/server.h b/src/brpc/server.h
index 52262e51..4843d0d0 100644
--- a/src/brpc/server.h
+++ b/src/brpc/server.h
@@ -262,6 +262,10 @@ struct ServerOptions {
// Default: ""
std::string server_info_name;
+ // Server will run in this tagged bthread worker group
+ // Default: BTHREAD_TAG_DEFAULT
+ bthread_tag_t bthread_tag;
+
private:
// SSLOptions is large and not often used, allocate it on heap to
// prevent ServerOptions from being bloated in most cases.
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 6b7eadf3..acd1b54d 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -582,7 +582,7 @@ int Socket::ResetFileDescriptor(int fd) {
EnableKeepaliveIfNeeded(fd);
if (_on_edge_triggered_events) {
- if (GetGlobalEventDispatcher(fd).AddConsumer(id(), fd) != 0) {
+ if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd)
!= 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
@@ -743,6 +743,7 @@ int Socket::Create(const SocketOptions& options, SocketId*
id) {
m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
m->_keepalive_options = options.keepalive_options;
+ m->_bthread_tag = options.bthread_tag;
CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
// Must be last one! Internal fields of this Socket may be access
// just after calling ResetFileDescriptor.
@@ -795,7 +796,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
if (_on_edge_triggered_events != NULL) {
- GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);
+ GetGlobalEventDispatcher(prev_fd,
_bthread_tag).RemoveConsumer(prev_fd);
}
close(prev_fd);
if (CreatedByConnect()) {
@@ -1103,7 +1104,7 @@ void Socket::OnRecycle() {
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
if (_on_edge_triggered_events != NULL) {
- GetGlobalEventDispatcher(prev_fd).RemoveConsumer(prev_fd);
+ GetGlobalEventDispatcher(prev_fd,
_bthread_tag).RemoveConsumer(prev_fd);
}
close(prev_fd);
if (create_by_connect) {
@@ -1231,7 +1232,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const
timespec* abstime) {
// Do not need to check addressable since it will be called by
// health checker which called `SetFailed' before
const int expected_val =
_epollout_butex->load(butil::memory_order_relaxed);
- EventDispatcher& edisp = GetGlobalEventDispatcher(fd);
+ EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _bthread_tag);
if (edisp.AddEpollOut(id(), fd, pollin) != 0) {
return -1;
}
@@ -1292,6 +1293,7 @@ int Socket::Connect(const timespec* abstime,
// be added into epoll device soon
SocketId connect_id;
SocketOptions options;
+ options.bthread_tag = _bthread_tag;
options.user = req;
if (Socket::Create(options, &connect_id) != 0) {
LOG(FATAL) << "Fail to create Socket";
@@ -1306,8 +1308,8 @@ int Socket::Connect(const timespec* abstime,
// Add `sockfd' into epoll so that `HandleEpollOutRequest' will
// be called with `req' when epoll event reaches
- if (GetGlobalEventDispatcher(sockfd).
- AddEpollOut(connect_id, sockfd, false) != 0) {
+ if (GetGlobalEventDispatcher(sockfd,
_bthread_tag).AddEpollOut(connect_id, sockfd, false) !=
+ 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
@@ -1377,7 +1379,8 @@ int Socket::ConnectIfNot(const timespec* abstime,
WriteRequest* req) {
if (_fd.load(butil::memory_order_consume) >= 0) {
return 0;
}
-
+ // Set tag for client side socket
+ _bthread_tag = bthread_self_tag();
// Have to hold a reference for `req'
SocketUniquePtr s;
ReAddress(&s);
@@ -1446,7 +1449,7 @@ int Socket::HandleEpollOutRequest(int error_code,
EpollOutRequest* req) {
}
// We've got the right to call user callback
// The timer will be removed inside destructor of EpollOutRequest
- GetGlobalEventDispatcher(req->fd).RemoveEpollOut(id(), req->fd, false);
+ GetGlobalEventDispatcher(req->fd, _bthread_tag).RemoveEpollOut(id(),
req->fd, false);
return req->on_epollout_event(req->fd, error_code, req->data);
}
@@ -2166,6 +2169,7 @@ int Socket::StartInputEvent(SocketId id, uint32_t events,
bthread_attr_t attr = thread_attr;
attr.keytable_pool = p->_keytable_pool;
+ attr.tag = bthread_self_tag();
if (bthread_start_urgent(&tid, &attr, ProcessEvent, p) != 0) {
LOG(FATAL) << "Fail to start ProcessEvent";
ProcessEvent(p);
@@ -2464,6 +2468,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
ptr->_rdma_ep->DebugInfo(os);
}
#endif
+ { os << "\nbthread_tag=" << ptr->_bthread_tag; }
}
int Socket::CheckHealth() {
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index d28d8f17..44ea0b02 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -219,6 +219,8 @@ struct SocketOptions {
// Socket keepalive related options.
// Refer to `SocketKeepaliveOptions' for details.
std::shared_ptr<SocketKeepaliveOptions> keepalive_options;
+ // Tag of this socket
+ bthread_tag_t bthread_tag;
};
// Abstractions on reading from and writing into file descriptors.
@@ -758,6 +760,7 @@ private:
// [ Set in ResetFileDescriptor ]
butil::atomic<int> _fd; // -1 when not connected.
+ bthread_tag_t _bthread_tag; // bthread tag of this socket
int _tos; // Type of service which is actually only 8bits.
int64_t _reset_fd_real_us; // When _fd was reset, in microseconds.
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index df93ac71..6af9e8f1 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -63,6 +63,7 @@ inline SocketOptions::SocketOptions()
, conn(NULL)
, app_connect(NULL)
, initial_parsing_context(NULL)
+ , bthread_tag(BTHREAD_TAG_DEFAULT)
{}
inline int Socket::Dereference() {
diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp
index 5ac0c3b1..201a6745 100644
--- a/src/bthread/bthread.cpp
+++ b/src/bthread/bthread.cpp
@@ -38,7 +38,13 @@ DEFINE_int32(bthread_min_concurrency, 0,
" The laziness is disabled when this value is non-positive,"
" and workers will be created eagerly according to
-bthread_concurrency and bthread_setconcurrency(). ");
+DEFINE_int32(bthread_current_tag, BTHREAD_TAG_DEFAULT, "Set bthread
concurrency for this tag");
+
+DEFINE_int32(bthread_concurrency_by_tag, 0,
+ "Number of pthread workers of FLAGS_bthread_current_tag");
+
static bool never_set_bthread_concurrency = true;
+static bool never_set_bthread_concurrency_by_tag = true;
static bool validate_bthread_concurrency(const char*, int32_t val) {
// bthread_setconcurrency sets the flag on success path which should
@@ -55,6 +61,17 @@ const int ALLOW_UNUSED
register_FLAGS_bthread_min_concurrency =
::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_min_concurrency,
validate_bthread_min_concurrency);
+static bool validate_bthread_current_tag(const char*, int32_t val);
+
+const int ALLOW_UNUSED register_FLAGS_bthread_current_tag =
+ ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_current_tag,
validate_bthread_current_tag);
+
+static bool validate_bthread_concurrency_by_tag(const char*, int32_t val);
+
+const int ALLOW_UNUSED register_FLAGS_bthread_concurrency_by_tag =
+ ::GFLAGS_NS::RegisterFlagValidator(&FLAGS_bthread_concurrency_by_tag,
+ validate_bthread_concurrency_by_tag);
+
BAIDU_CASSERT(sizeof(TaskControl*) == sizeof(butil::atomic<TaskControl*>),
atomic_size_match);
pthread_mutex_t g_task_control_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -65,6 +82,7 @@ TaskControl* g_task_control = NULL;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
extern void (*g_worker_startfn)();
+extern void (*g_tagged_worker_startfn)(bthread_tag_t);
inline TaskControl* get_task_control() {
return g_task_control;
@@ -97,6 +115,15 @@ inline TaskControl* get_or_new_task_control() {
return c;
}
+static int add_workers_for_each_tag(int num) {
+ int added = 0;
+ auto c = get_task_control();
+ for (auto i = 0; i < num; ++i) {
+ added += c->add_workers(1, i % FLAGS_task_group_ntags);
+ }
+ return added;
+}
+
static bool validate_bthread_min_concurrency(const char*, int32_t val) {
if (val <= 0) {
return true;
@@ -111,38 +138,62 @@ static bool validate_bthread_min_concurrency(const char*,
int32_t val) {
BAIDU_SCOPED_LOCK(g_task_control_mutex);
int concurrency = c->concurrency();
if (val > concurrency) {
- int added = c->add_workers(val - concurrency);
+ int added = bthread::add_workers_for_each_tag(val - concurrency);
return added == (val - concurrency);
} else {
return true;
}
}
+static bool validate_bthread_current_tag(const char*, int32_t val) {
+ if (val < BTHREAD_TAG_DEFAULT || val >= FLAGS_task_group_ntags) {
+ return false;
+ }
+ BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
+ auto c = bthread::get_task_control();
+ if (c == NULL) {
+ FLAGS_bthread_concurrency_by_tag = 0;
+ return true;
+ }
+ FLAGS_bthread_concurrency_by_tag = c->concurrency(val);
+ return true;
+}
+
+static bool validate_bthread_concurrency_by_tag(const char*, int32_t val) {
+ return bthread_setconcurrency_by_tag(val, FLAGS_bthread_current_tag) == 0;
+}
+
__thread TaskGroup* tls_task_group_nosignal = NULL;
BUTIL_FORCE_INLINE int
start_from_non_worker(bthread_t* __restrict tid,
const bthread_attr_t* __restrict attr,
- void * (*fn)(void*),
+ void* (*fn)(void*),
void* __restrict arg) {
TaskControl* c = get_or_new_task_control();
if (NULL == c) {
return ENOMEM;
}
+ TaskGroup* g = NULL;
if (attr != NULL && (attr->flags & BTHREAD_NOSIGNAL)) {
// Remember the TaskGroup to insert NOSIGNAL tasks for 2 reasons:
// 1. NOSIGNAL is often for creating many bthreads in batch,
// inserting into the same TaskGroup maximizes the batch.
// 2. bthread_flush() needs to know which TaskGroup to flush.
- TaskGroup* g = tls_task_group_nosignal;
+ g = tls_task_group_nosignal;
if (NULL == g) {
- g = c->choose_one_group();
+ g = c->choose_one_group(attr->tag);
tls_task_group_nosignal = g;
}
return g->start_background<true>(tid, attr, fn, arg);
}
- return c->choose_one_group()->start_background<true>(
- tid, attr, fn, arg);
+ g = c->choose_one_group(attr ? attr->tag : BTHREAD_TAG_DEFAULT);
+ return g->start_background<true>(tid, attr, fn, arg);
+}
+
+// if tag is default or equal to thread local use thread local task group
+BUTIL_FORCE_INLINE bool can_run_thread_local(const bthread_attr_t* __restrict
attr) {
+ return attr == nullptr || attr->tag == bthread::tls_task_group->tag();
}
struct TidTraits {
@@ -175,8 +226,10 @@ int bthread_start_urgent(bthread_t* __restrict tid,
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
- // start from worker
- return bthread::TaskGroup::start_foreground(&g, tid, attr, fn, arg);
+ // if attribute is null use thread local task group
+ if (bthread::can_run_thread_local(attr)) {
+ return bthread::TaskGroup::start_foreground(&g, tid, attr, fn,
arg);
+ }
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
@@ -187,8 +240,10 @@ int bthread_start_background(bthread_t* __restrict tid,
void* __restrict arg) {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g) {
- // start from worker
- return g->start_background<false>(tid, attr, fn, arg);
+ // if attribute is null use thread local task group
+ if (bthread::can_run_thread_local(attr)) {
+ return g->start_background<false>(tid, attr, fn, arg);
+ }
}
return bthread::start_from_non_worker(tid, attr, fn, arg);
}
@@ -306,13 +361,47 @@ int bthread_setconcurrency(int num) {
}
if (num > bthread::FLAGS_bthread_concurrency) {
// Create more workers if needed.
- bthread::FLAGS_bthread_concurrency +=
- c->add_workers(num - bthread::FLAGS_bthread_concurrency);
- return 0;
+ auto added = bthread::add_workers_for_each_tag(num -
bthread::FLAGS_bthread_concurrency);
+ bthread::FLAGS_bthread_concurrency += added;
}
return (num == bthread::FLAGS_bthread_concurrency ? 0 : EPERM);
}
+int bthread_getconcurrency_by_tag(bthread_tag_t tag) {
+ BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
+ auto c = bthread::get_task_control();
+ if (c == NULL) {
+ return EPERM;
+ }
+ return c->concurrency(tag);
+}
+
+int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag) {
+ if (bthread::never_set_bthread_concurrency_by_tag) {
+ bthread::never_set_bthread_concurrency_by_tag = false;
+ return 0;
+ }
+ BAIDU_SCOPED_LOCK(bthread::g_task_control_mutex);
+ auto c = bthread::get_task_control();
+ if (c == NULL) {
+ return EPERM;
+ }
+ auto ngroup = c->concurrency();
+ auto tag_ngroup = c->concurrency(tag);
+ auto add = num - tag_ngroup;
+ if (ngroup + add > bthread::FLAGS_bthread_concurrency) {
+ LOG(ERROR) << "Fail to set concurrency by tag " << tag
+ << ", Whole concurrency larger than bthread_concurrency";
+ return EPERM;
+ }
+ auto added = 0;
+ if (add > 0) {
+ added = c->add_workers(add, tag);
+ return (add == added ? 0 : EPERM);
+ }
+ return (num == tag_ngroup ? 0 : EPERM);
+}
+
int bthread_about_to_quit() {
bthread::TaskGroup* g = bthread::tls_task_group;
if (g != NULL) {
@@ -384,6 +473,14 @@ int bthread_set_worker_startfn(void (*start_fn)()) {
return 0;
}
+int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t)) {
+ if (start_fn == NULL) {
+ return EINVAL;
+ }
+ bthread::g_tagged_worker_startfn = start_fn;
+ return 0;
+}
+
void bthread_stop_world() {
bthread::TaskControl* c = bthread::get_task_control();
if (c != NULL) {
@@ -433,5 +530,10 @@ int bthread_list_join(bthread_list_t* list) {
static_cast<bthread::TidList*>(list->impl)->apply(bthread::TidJoiner());
return 0;
}
-
+
+bthread_tag_t bthread_self_tag(void) {
+ return bthread::tls_task_group != nullptr ? bthread::tls_task_group->tag()
+ : BTHREAD_TAG_DEFAULT;
+}
+
} // extern "C"
diff --git a/src/bthread/bthread.h b/src/bthread/bthread.h
index 3f55eb67..f91bc9af 100644
--- a/src/bthread/bthread.h
+++ b/src/bthread/bthread.h
@@ -146,6 +146,12 @@ extern int bthread_getconcurrency(void);
// NOTE: currently concurrency cannot be reduced after any bthread created.
extern int bthread_setconcurrency(int num);
+// Get number of worker pthreads by tag
+extern int bthread_getconcurrency_by_tag(bthread_tag_t tag);
+
+// Set number of worker pthreads to `num' for specified tag
+extern int bthread_setconcurrency_by_tag(int num, bthread_tag_t tag);
+
// Yield processor to another bthread.
// Notice that current implementation is not fair, which means that
// even if bthread_yield() is called, suspended threads may still starve.
@@ -327,6 +333,9 @@ extern int bthread_setspecific(bthread_key_t key, void*
data);
// If the key is invalid or deleted, return NULL.
extern void* bthread_getspecific(bthread_key_t key);
+// Return current bthread tag
+extern bthread_tag_t bthread_self_tag(void);
+
__END_DECLS
#endif // BTHREAD_BTHREAD_H
diff --git a/src/bthread/butex.cpp b/src/bthread/butex.cpp
index 19b03725..5ac44e1b 100644
--- a/src/bthread/butex.cpp
+++ b/src/bthread/butex.cpp
@@ -273,15 +273,16 @@ void butex_destroy(void* butex) {
}
inline TaskGroup* get_task_group(TaskControl* c, bool nosignal = false) {
- TaskGroup* g;
+ TaskGroup* g = tls_task_group;
if (nosignal) {
- g = tls_task_group_nosignal;
- if (NULL == g) {
- g = c->choose_one_group();
+ if (NULL == tls_task_group_nosignal) {
+ g = g ? g : c->choose_one_group();
tls_task_group_nosignal = g;
+ } else {
+ g = tls_task_group_nosignal;
}
} else {
- g = tls_task_group ? tls_task_group : c->choose_one_group();
+ g = g ? g : c->choose_one_group();
}
return g;
}
diff --git a/src/bthread/task_control.cpp b/src/bthread/task_control.cpp
index 15f1d7b6..d0549ea9 100644
--- a/src/bthread/task_control.cpp
+++ b/src/bthread/task_control.cpp
@@ -39,6 +39,7 @@ DEFINE_int32(task_group_runqueue_capacity, 4096,
"capacity of runqueue in each TaskGroup");
DEFINE_int32(task_group_yield_before_idle, 0,
"TaskGroup yields so many times before idle");
+DEFINE_int32(task_group_ntags, 1, "TaskGroup will be grouped by number ntags");
namespace bthread {
@@ -48,6 +49,7 @@ DECLARE_int32(bthread_min_concurrency);
extern pthread_mutex_t g_task_control_mutex;
extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group;
void (*g_worker_startfn)() = NULL;
+void (*g_tagged_worker_startfn)(bthread_tag_t) = NULL;
// May be called in other modules to run startfn in non-worker pthreads.
void run_worker_startfn() {
@@ -56,28 +58,44 @@ void run_worker_startfn() {
}
}
+void run_tagged_worker_startfn(bthread_tag_t tag) {
+ if (g_tagged_worker_startfn) {
+ g_tagged_worker_startfn(tag);
+ }
+}
+
+struct WorkerThreadArgs {
+ WorkerThreadArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), tag(_t) {}
+ TaskControl* c;
+ bthread_tag_t tag;
+};
+
void* TaskControl::worker_thread(void* arg) {
- run_worker_startfn();
+ run_worker_startfn();
#ifdef BAIDU_INTERNAL
logging::ComlogInitializer comlog_initializer;
#endif
-
- TaskControl* c = static_cast<TaskControl*>(arg);
- TaskGroup* g = c->create_group();
+
+ auto dummy = static_cast<WorkerThreadArgs*>(arg);
+ auto c = dummy->c;
+ auto tag = dummy->tag;
+ delete dummy;
+ run_tagged_worker_startfn(tag);
+
+ TaskGroup* g = c->create_group(tag);
TaskStatistics stat;
if (NULL == g) {
LOG(ERROR) << "Fail to create TaskGroup in pthread=" << pthread_self();
return NULL;
}
std::string worker_thread_name = butil::string_printf(
- "brpc_worker:%d",
- c->_next_worker_id.fetch_add(1, butil::memory_order_relaxed));
+ "brpc_wkr:%d-%d", g->tag(), c->_next_worker_id.fetch_add(1,
butil::memory_order_relaxed));
butil::PlatformThread::SetName(worker_thread_name.c_str());
- BT_VLOG << "Created worker=" << pthread_self()
- << " bthread=" << g->main_tid();
-
+ BT_VLOG << "Created worker=" << pthread_self() << " bthread=" <<
g->main_tid()
+ << " tag=" << g->tag();
tls_task_group = g;
c->_nworkers << 1;
+ c->tag_nworkers(g->tag()) << 1;
g->run_main_task();
stat = g->main_stat();
@@ -87,10 +105,11 @@ void* TaskControl::worker_thread(void* arg) {
tls_task_group = NULL;
g->destroy_self();
c->_nworkers << -1;
+ c->tag_nworkers(g->tag()) << -1;
return NULL;
}
-TaskGroup* TaskControl::create_group() {
+TaskGroup* TaskControl::create_group(bthread_tag_t tag) {
TaskGroup* g = new (std::nothrow) TaskGroup(this);
if (NULL == g) {
LOG(FATAL) << "Fail to new TaskGroup";
@@ -101,7 +120,7 @@ TaskGroup* TaskControl::create_group() {
delete g;
return NULL;
}
- if (_add_group(g) != 0) {
+ if (_add_group(g, tag) != 0) {
delete g;
return NULL;
}
@@ -117,6 +136,19 @@ static double get_cumulated_worker_time_from_this(void
*arg) {
return static_cast<TaskControl*>(arg)->get_cumulated_worker_time();
}
+struct CumulatedWithTagArgs {
+ CumulatedWithTagArgs(TaskControl* _c, bthread_tag_t _t) : c(_c), t(_t) {}
+ TaskControl* c;
+ bthread_tag_t t;
+};
+
+static double get_cumulated_worker_time_from_this_with_tag(void* arg) {
+ auto a = static_cast<CumulatedWithTagArgs*>(arg);
+ auto c = a->c;
+ auto t = a->t;
+ return c->get_cumulated_worker_time_with_tag(t);
+}
+
static int64_t get_cumulated_switch_count_from_this(void *arg) {
return static_cast<TaskControl*>(arg)->get_cumulated_switch_count();
}
@@ -127,8 +159,9 @@ static int64_t get_cumulated_signal_count_from_this(void
*arg) {
TaskControl::TaskControl()
// NOTE: all fileds must be initialized before the vars.
- : _ngroup(0)
- , _groups((TaskGroup**)calloc(BTHREAD_MAX_CONCURRENCY, sizeof(TaskGroup*)))
+ : _tagged_ngroup(FLAGS_task_group_ntags)
+ , _tagged_groups(FLAGS_task_group_ntags)
+ , _init(false)
, _stop(false)
, _concurrency(0)
, _next_worker_id(0)
@@ -144,10 +177,8 @@ TaskControl::TaskControl()
, _signal_per_second(&_cumulated_signal_count)
, _status(print_rq_sizes_in_the_tc, this)
, _nbthreads("bthread_count")
-{
- // calloc shall set memory to zero
- CHECK(_groups) << "Fail to create array of groups";
-}
+ , _pl(FLAGS_task_group_ntags)
+{}
int TaskControl::init(int concurrency) {
if (_concurrency != 0) {
@@ -160,6 +191,18 @@ int TaskControl::init(int concurrency) {
}
_concurrency = concurrency;
+ // task group group by tags
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ _tagged_ngroup[i].store(0, std::memory_order_relaxed);
+ auto tag_str = std::to_string(i);
+ _tagged_nworkers.push_back(new
bvar::Adder<int64_t>("bthread_worker_count", tag_str));
+ _tagged_cumulated_worker_time.push_back(new
bvar::PassiveStatus<double>(
+ get_cumulated_worker_time_from_this_with_tag, new
CumulatedWithTagArgs{this, i}));
+ _tagged_worker_usage_second.push_back(new
bvar::PerSecond<bvar::PassiveStatus<double>>(
+ "bthread_worker_usage", tag_str, _tagged_cumulated_worker_time[i],
1));
+ _tagged_nbthreads.push_back(new bvar::Adder<int64_t>("bthread_count",
tag_str));
+ }
+
// Make sure TimerThread is ready.
if (get_or_create_global_timer_thread() == NULL) {
LOG(ERROR) << "Fail to get global_timer_thread";
@@ -168,8 +211,10 @@ int TaskControl::init(int concurrency) {
_workers.resize(_concurrency);
for (int i = 0; i < _concurrency; ++i) {
- const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
+ auto arg = new WorkerThreadArgs(this, i % FLAGS_task_group_ntags);
+ const int rc = pthread_create(&_workers[i], NULL, worker_thread, arg);
if (rc) {
+ delete arg;
LOG(ERROR) << "Fail to create _workers[" << i << "], " <<
berror(rc);
return -1;
}
@@ -182,13 +227,20 @@ int TaskControl::init(int concurrency) {
// Wait for at least one group is added so that choose_one_group()
// never returns NULL.
// TODO: Handle the case that worker quits before add_group
- while (_ngroup == 0) {
- usleep(100); // TODO: Elaborate
+ for (int i = 0; i < FLAGS_task_group_ntags;) {
+ if (_tagged_ngroup[i].load(std::memory_order_acquire) == 0) {
+ usleep(100); // TODO: Elaborate
+ continue;
+ }
+ ++i;
}
+
+ _init.store(true, butil::memory_order_release);
+
return 0;
}
-int TaskControl::add_workers(int num) {
+int TaskControl::add_workers(int num, bthread_tag_t tag) {
if (num <= 0) {
return 0;
}
@@ -202,9 +254,11 @@ int TaskControl::add_workers(int num) {
// Worker will add itself to _idle_workers, so we have to add
// _concurrency before create a worker.
_concurrency.fetch_add(1);
+ auto arg = new WorkerThreadArgs(this, tag);
const int rc = pthread_create(
- &_workers[i + old_concurency], NULL, worker_thread, this);
+ &_workers[i + old_concurency], NULL, worker_thread, arg);
if (rc) {
+ delete arg;
LOG(WARNING) << "Fail to create _workers[" << i + old_concurency
<< "], " << berror(rc);
_concurrency.fetch_sub(1, butil::memory_order_release);
@@ -216,10 +270,12 @@ int TaskControl::add_workers(int num) {
return _concurrency.load(butil::memory_order_relaxed) - old_concurency;
}
-TaskGroup* TaskControl::choose_one_group() {
- const size_t ngroup = _ngroup.load(butil::memory_order_acquire);
+TaskGroup* TaskControl::choose_one_group(bthread_tag_t tag) {
+ CHECK(tag >= BTHREAD_TAG_DEFAULT && tag < FLAGS_task_group_ntags);
+ auto& groups = tag_group(tag);
+ const auto ngroup = tag_ngroup(tag).load(butil::memory_order_acquire);
if (ngroup != 0) {
- return _groups[butil::fast_rand_less_than(ngroup)];
+ return groups[butil::fast_rand_less_than(ngroup)];
}
CHECK(false) << "Impossible: ngroup is 0";
return NULL;
@@ -236,10 +292,14 @@ void TaskControl::stop_and_join() {
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
_stop = true;
- _ngroup.exchange(0, butil::memory_order_relaxed);
+ std::for_each(
+ _tagged_ngroup.begin(), _tagged_ngroup.end(),
+ [](butil::atomic<size_t>& index) { index.store(0,
butil::memory_order_relaxed); });
}
- for (int i = 0; i < PARKING_LOT_NUM; ++i) {
- _pl[i].stop();
+ for (int i = 0; i < FLAGS_task_group_ntags; ++i) {
+ for (auto& pl : _pl[i]) {
+ pl.stop();
+ }
}
// Interrupt blocking operations.
for (size_t i = 0; i < _workers.size(); ++i) {
@@ -261,12 +321,9 @@ TaskControl::~TaskControl() {
_status.hide();
stop_and_join();
-
- free(_groups);
- _groups = NULL;
}
-int TaskControl::_add_group(TaskGroup* g) {
+int TaskControl::_add_group(TaskGroup* g, bthread_tag_t tag) {
if (__builtin_expect(NULL == g, 0)) {
return -1;
}
@@ -274,15 +331,17 @@ int TaskControl::_add_group(TaskGroup* g) {
if (_stop) {
return -1;
}
- size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
+ g->set_tag(tag);
+ g->set_pl(&_pl[tag][butil::fmix64(pthread_numeric_id()) %
PARKING_LOT_NUM]);
+ size_t ngroup = _tagged_ngroup[tag].load(butil::memory_order_relaxed);
if (ngroup < (size_t)BTHREAD_MAX_CONCURRENCY) {
- _groups[ngroup] = g;
- _ngroup.store(ngroup + 1, butil::memory_order_release);
+ _tagged_groups[tag][ngroup] = g;
+ _tagged_ngroup[tag].store(ngroup + 1, butil::memory_order_release);
}
mu.unlock();
// See the comments in _destroy_group
// TODO: Not needed anymore since non-worker pthread cannot have TaskGroup
- signal_task(65536);
+ // signal_task(65536, tag);
return 0;
}
@@ -303,11 +362,13 @@ int TaskControl::_destroy_group(TaskGroup* g) {
bool erased = false;
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
- const size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
+ auto tag = g->tag();
+ auto& groups = tag_group(tag);
+ const size_t ngroup =
tag_ngroup(tag).load(butil::memory_order_relaxed);
for (size_t i = 0; i < ngroup; ++i) {
- if (_groups[i] == g) {
+ if (groups[i] == g) {
// No need for atomic_thread_fence because lock did it.
- _groups[i] = _groups[ngroup - 1];
+ groups[i] = groups[ngroup - 1];
// Change _ngroup and keep _groups unchanged at last so that:
// - If steal_task sees the newest _ngroup, it would not touch
// _groups[ngroup -1]
@@ -317,7 +378,7 @@ int TaskControl::_destroy_group(TaskGroup* g) {
// overwrite it, since we do signal_task in _add_group(),
// we think the pending tasks of _groups[ngroup - 1] would
// not miss.
- _ngroup.store(ngroup - 1, butil::memory_order_release);
+ tag_ngroup(tag).store(ngroup - 1, butil::memory_order_release);
//_groups[ngroup - 1] = NULL;
erased = true;
break;
@@ -339,9 +400,10 @@ int TaskControl::_destroy_group(TaskGroup* g) {
}
bool TaskControl::steal_task(bthread_t* tid, size_t* seed, size_t offset) {
+ auto tag = tls_task_group->tag();
// 1: Acquiring fence is paired with releasing fence in _add_group to
// avoid accessing uninitialized slot of _groups.
- const size_t ngroup = _ngroup.load(butil::memory_order_acquire/*1*/);
+ const size_t ngroup =
tag_ngroup(tag).load(butil::memory_order_acquire/*1*/);
if (0 == ngroup) {
return false;
}
@@ -349,8 +411,9 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed,
size_t offset) {
// NOTE: Don't return inside `for' iteration since we need to update |seed|
bool stolen = false;
size_t s = *seed;
+ auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i, s += offset) {
- TaskGroup* g = _groups[s % ngroup];
+ TaskGroup* g = groups[s % ngroup];
// g is possibly NULL because of concurrent _destroy_group
if (g) {
if (g->_rq.steal(tid)) {
@@ -367,7 +430,7 @@ bool TaskControl::steal_task(bthread_t* tid, size_t* seed,
size_t offset) {
return stolen;
}
-void TaskControl::signal_task(int num_task) {
+void TaskControl::signal_task(int num_task, bthread_tag_t tag) {
if (num_task <= 0) {
return;
}
@@ -378,14 +441,15 @@ void TaskControl::signal_task(int num_task) {
if (num_task > 2) {
num_task = 2;
}
+ auto& pl = tag_pl(tag);
int start_index = butil::fmix64(pthread_numeric_id()) % PARKING_LOT_NUM;
- num_task -= _pl[start_index].signal(1);
+ num_task -= pl[start_index].signal(1);
if (num_task > 0) {
for (int i = 1; i < PARKING_LOT_NUM && num_task > 0; ++i) {
if (++start_index >= PARKING_LOT_NUM) {
start_index = 0;
}
- num_task -= _pl[start_index].signal(1);
+ num_task -= pl[start_index].signal(1);
}
}
if (num_task > 0 &&
@@ -394,21 +458,26 @@ void TaskControl::signal_task(int num_task) {
// TODO: Reduce this lock
BAIDU_SCOPED_LOCK(g_task_control_mutex);
if (_concurrency.load(butil::memory_order_acquire) <
FLAGS_bthread_concurrency) {
- add_workers(1);
+ add_workers(1, tag);
}
}
}
void TaskControl::print_rq_sizes(std::ostream& os) {
- const size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
+ size_t ngroup = 0;
+ std::for_each(_tagged_ngroup.begin(), _tagged_ngroup.end(),
[&](butil::atomic<size_t>& index) {
+ ngroup += index.load(butil::memory_order_relaxed);
+ });
DEFINE_SMALL_ARRAY(int, nums, ngroup, 128);
{
BAIDU_SCOPED_LOCK(_modify_group_mutex);
// ngroup > _ngroup: nums[_ngroup ... ngroup-1] = 0
// ngroup < _ngroup: just ignore _groups[_ngroup ... ngroup-1]
- for (size_t i = 0; i < ngroup; ++i) {
- nums[i] = (_groups[i] ? _groups[i]->_rq.volatile_size() : 0);
- }
+ int i = 0;
+ for_each_task_group([&](TaskGroup* g) {
+ nums[i] = (g ? g->_rq.volatile_size() : 0);
+ ++i;
+ });
}
for (size_t i = 0; i < ngroup; ++i) {
os << nums[i] << ' ';
@@ -418,10 +487,22 @@ void TaskControl::print_rq_sizes(std::ostream& os) {
double TaskControl::get_cumulated_worker_time() {
int64_t cputime_ns = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
- const size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
+ for_each_task_group([&](TaskGroup* g) {
+ if (g) {
+ cputime_ns += g->_cumulated_cputime_ns;
+ }
+ });
+ return cputime_ns / 1000000000.0;
+}
+
+double TaskControl::get_cumulated_worker_time_with_tag(bthread_tag_t tag) {
+ int64_t cputime_ns = 0;
+ BAIDU_SCOPED_LOCK(_modify_group_mutex);
+ const size_t ngroup = tag_ngroup(tag).load(butil::memory_order_relaxed);
+ auto& groups = tag_group(tag);
for (size_t i = 0; i < ngroup; ++i) {
- if (_groups[i]) {
- cputime_ns += _groups[i]->_cumulated_cputime_ns;
+ if (groups[i]) {
+ cputime_ns += groups[i]->_cumulated_cputime_ns;
}
}
return cputime_ns / 1000000000.0;
@@ -430,25 +511,22 @@ double TaskControl::get_cumulated_worker_time() {
int64_t TaskControl::get_cumulated_switch_count() {
int64_t c = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
- const size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
- for (size_t i = 0; i < ngroup; ++i) {
- if (_groups[i]) {
- c += _groups[i]->_nswitch;
+ for_each_task_group([&](TaskGroup* g) {
+ if (g) {
+ c += g->_nswitch;
}
- }
+ });
return c;
}
int64_t TaskControl::get_cumulated_signal_count() {
int64_t c = 0;
BAIDU_SCOPED_LOCK(_modify_group_mutex);
- const size_t ngroup = _ngroup.load(butil::memory_order_relaxed);
- for (size_t i = 0; i < ngroup; ++i) {
- TaskGroup* g = _groups[i];
+ for_each_task_group([&](TaskGroup* g) {
if (g) {
c += g->_nsignaled + g->_remote_nsignaled;
}
- }
+ });
return c;
}
diff --git a/src/bthread/task_control.h b/src/bthread/task_control.h
index e318c265..a19636ac 100644
--- a/src/bthread/task_control.h
+++ b/src/bthread/task_control.h
@@ -26,6 +26,9 @@
#include <iostream> // std::ostream
#endif
#include <stddef.h> // size_t
+#include <vector>
+#include <array>
+#include <memory>
#include "butil/atomicops.h" // butil::atomic
#include "bvar/bvar.h" // bvar::PassiveStatus
#include "bthread/task_meta.h" // TaskMeta
@@ -33,6 +36,7 @@
#include "bthread/work_stealing_queue.h" // WorkStealingQueue
#include "bthread/parking_lot.h"
+DECLARE_int32(task_group_ntags);
namespace bthread {
class TaskGroup;
@@ -49,13 +53,13 @@ public:
int init(int nconcurrency);
// Create a TaskGroup in this control.
- TaskGroup* create_group();
+ TaskGroup* create_group(bthread_tag_t tag);
// Steal a task from a "random" group.
bool steal_task(bthread_t* tid, size_t* seed, size_t offset);
// Tell other groups that `n' tasks was just added to caller's runqueue
- void signal_task(int num_task);
+ void signal_task(int num_task, bthread_tag_t tag);
// Stop and join worker threads in TaskControl.
void stop_and_join();
@@ -64,37 +68,59 @@ public:
int concurrency() const
{ return _concurrency.load(butil::memory_order_acquire); }
+ int concurrency(bthread_tag_t tag) const
+ { return _tagged_ngroup[tag].load(butil::memory_order_acquire); }
+
void print_rq_sizes(std::ostream& os);
double get_cumulated_worker_time();
+ double get_cumulated_worker_time_with_tag(bthread_tag_t tag);
int64_t get_cumulated_switch_count();
int64_t get_cumulated_signal_count();
// [Not thread safe] Add more worker threads.
// Return the number of workers actually added, which may be less than
|num|
- int add_workers(int num);
+ int add_workers(int num, bthread_tag_t tag);
// Choose one TaskGroup (randomly right now).
// If this method is called after init(), it never returns NULL.
- TaskGroup* choose_one_group();
+ TaskGroup* choose_one_group(bthread_tag_t tag = BTHREAD_TAG_DEFAULT);
private:
+ typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups;
+ static const int PARKING_LOT_NUM = 4;
+ typedef std::array<ParkingLot, PARKING_LOT_NUM> TaggedParkingLot;
// Add/Remove a TaskGroup.
// Returns 0 on success, -1 otherwise.
- int _add_group(TaskGroup*);
+ int _add_group(TaskGroup*, bthread_tag_t tag);
int _destroy_group(TaskGroup*);
+ // Tag group
+ TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; }
+
+ // Tag ngroup
+ butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; }
+
+ // Tag parking slot
+ TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _pl[tag]; }
+
static void delete_task_group(void* arg);
static void* worker_thread(void* task_control);
+ template <typename F>
+ void for_each_task_group(F const& f);
+
bvar::LatencyRecorder& exposed_pending_time();
bvar::LatencyRecorder* create_exposed_pending_time();
+ bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag);
+ bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag);
- butil::atomic<size_t> _ngroup;
- TaskGroup** _groups;
+ std::vector<butil::atomic<size_t>> _tagged_ngroup;
+ std::vector<TaggedGroups> _tagged_groups;
butil::Mutex _modify_group_mutex;
+ butil::atomic<bool> _init; // if not init, bvar will case coredump
bool _stop;
butil::atomic<int> _concurrency;
std::vector<pthread_t> _workers;
@@ -112,8 +138,12 @@ private:
bvar::PassiveStatus<std::string> _status;
bvar::Adder<int64_t> _nbthreads;
- static const int PARKING_LOT_NUM = 4;
- ParkingLot _pl[PARKING_LOT_NUM];
+ std::vector<bvar::Adder<int64_t>*> _tagged_nworkers;
+ std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time;
+ std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*>
_tagged_worker_usage_second;
+ std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads;
+
+ std::vector<TaggedParkingLot> _pl;
};
inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() {
@@ -124,6 +154,28 @@ inline bvar::LatencyRecorder&
TaskControl::exposed_pending_time() {
return *pt;
}
+inline bvar::Adder<int64_t>& TaskControl::tag_nworkers(bthread_tag_t tag) {
+ return *_tagged_nworkers[tag];
+}
+
+inline bvar::Adder<int64_t>& TaskControl::tag_nbthreads(bthread_tag_t tag) {
+ return *_tagged_nbthreads[tag];
+}
+
+template <typename F>
+inline void TaskControl::for_each_task_group(F const& f) {
+ if (_init.load(butil::memory_order_acquire) == false) {
+ return;
+ }
+ for (size_t i = 0; i < _tagged_groups.size(); ++i) {
+ auto ngroup = tag_ngroup(i).load(butil::memory_order_relaxed);
+ auto& groups = tag_group(i);
+ for (size_t j = 0; j < ngroup; ++j) {
+ f(groups[j]);
+ }
+ }
+}
+
} // namespace bthread
#endif // BTHREAD_TASK_CONTROL_H
diff --git a/src/bthread/task_group.cpp b/src/bthread/task_group.cpp
index 0a785601..1c2fd522 100644
--- a/src/bthread/task_group.cpp
+++ b/src/bthread/task_group.cpp
@@ -40,7 +40,7 @@
namespace bthread {
static const bthread_attr_t BTHREAD_ATTR_TASKGROUP = {
- BTHREAD_STACKTYPE_UNKNOWN, 0, NULL };
+ BTHREAD_STACKTYPE_UNKNOWN, 0, NULL, BTHREAD_TAG_DEFAULT };
static bool pass_bool(const char*, bool) { return true; }
@@ -192,10 +192,10 @@ TaskGroup::TaskGroup(TaskControl* c)
#ifndef NDEBUG
, _sched_recursive_guard(0)
#endif
+ , _tag(BTHREAD_TAG_DEFAULT)
{
_steal_seed = butil::fast_rand();
_steal_offset = OFFSET_TABLE[_steal_seed % ARRAY_SIZE(OFFSET_TABLE)];
- _pl = &c->_pl[butil::fmix64(pthread_numeric_id()) %
TaskControl::PARKING_LOT_NUM];
CHECK(c);
}
@@ -335,6 +335,7 @@ void TaskGroup::task_runner(intptr_t skip_remained) {
butex_wake_except(m->version_butex, 0);
g->_control->_nbthreads << -1;
+ g->_control->tag_nbthreads(g->tag()) << -1;
g->set_remained(TaskGroup::_release_last_context, m);
ending_sched(&g);
@@ -392,6 +393,7 @@ int TaskGroup::start_foreground(TaskGroup** pg,
TaskGroup* g = *pg;
g->_control->_nbthreads << 1;
+ g->_control->tag_nbthreads(g->tag()) << 1;
if (g->is_current_pthread_task()) {
// never create foreground task in pthread.
g->ready_to_run(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
@@ -448,6 +450,7 @@ int TaskGroup::start_background(bthread_t* __restrict th,
LOG(INFO) << "Started bthread " << m->tid;
}
_control->_nbthreads << 1;
+ _control->tag_nbthreads(tag()) << 1;
if (REMOTE) {
ready_to_run_remote(m->tid, (using_attr.flags & BTHREAD_NOSIGNAL));
} else {
@@ -658,7 +661,7 @@ void TaskGroup::ready_to_run(bthread_t tid, bool nosignal) {
const int additional_signal = _num_nosignal;
_num_nosignal = 0;
_nsignaled += 1 + additional_signal;
- _control->signal_task(1 + additional_signal);
+ _control->signal_task(1 + additional_signal, _tag);
}
}
@@ -667,7 +670,7 @@ void TaskGroup::flush_nosignal_tasks() {
if (val) {
_num_nosignal = 0;
_nsignaled += val;
- _control->signal_task(val);
+ _control->signal_task(val, _tag);
}
}
@@ -688,7 +691,7 @@ void TaskGroup::ready_to_run_remote(bthread_t tid, bool
nosignal) {
_remote_num_nosignal = 0;
_remote_nsignaled += 1 + additional_signal;
_remote_rq._mutex.unlock();
- _control->signal_task(1 + additional_signal);
+ _control->signal_task(1 + additional_signal, _tag);
}
}
@@ -701,7 +704,7 @@ void
TaskGroup::flush_nosignal_tasks_remote_locked(butil::Mutex& locked_mutex) {
_remote_num_nosignal = 0;
_remote_nsignaled += val;
locked_mutex.unlock();
- _control->signal_task(val);
+ _control->signal_task(val, _tag);
}
void TaskGroup::ready_to_run_general(bthread_t tid, bool nosignal) {
@@ -738,7 +741,9 @@ struct SleepArgs {
static void ready_to_run_from_timer_thread(void* arg) {
CHECK(tls_task_group == NULL);
const SleepArgs* e = static_cast<const SleepArgs*>(arg);
- e->group->control()->choose_one_group()->ready_to_run_remote(e->tid);
+ auto g = e->group;
+ auto tag = g->tag();
+ g->control()->choose_one_group(tag)->ready_to_run_remote(e->tid);
}
void TaskGroup::_add_sleep_event(void* void_args) {
diff --git a/src/bthread/task_group.h b/src/bthread/task_group.h
index 2a1bb2a9..d8598678 100644
--- a/src/bthread/task_group.h
+++ b/src/bthread/task_group.h
@@ -182,6 +182,8 @@ public:
// process make go on indefinitely.
void push_rq(bthread_t tid);
+ bthread_tag_t tag() const { return _tag; }
+
private:
friend class TaskControl;
@@ -221,6 +223,10 @@ friend class TaskControl;
return _control->steal_task(tid, &_steal_seed, _steal_offset);
}
+ void set_tag(bthread_tag_t tag) { _tag = tag; }
+
+ void set_pl(ParkingLot* pl) { _pl = pl; }
+
TaskMeta* _cur_meta;
// the control that this group belongs to
@@ -249,6 +255,8 @@ friend class TaskControl;
int _remote_nsignaled;
int _sched_recursive_guard;
+ // tag of this taskgroup
+ bthread_tag_t _tag;
};
} // namespace bthread
diff --git a/src/bthread/types.h b/src/bthread/types.h
index a84e4793..e3fdaa8f 100644
--- a/src/bthread/types.h
+++ b/src/bthread/types.h
@@ -32,6 +32,10 @@ typedef uint64_t bthread_t;
// tid returned by bthread_start_* never equals this value.
static const bthread_t INVALID_BTHREAD = 0;
+// bthread tag default is 0
+typedef int bthread_tag_t;
+static const bthread_tag_t BTHREAD_TAG_DEFAULT = 0;
+
struct sockaddr;
typedef unsigned bthread_stacktype_t;
@@ -93,12 +97,14 @@ typedef struct bthread_attr_t {
bthread_stacktype_t stack_type;
bthread_attrflags_t flags;
bthread_keytable_pool_t* keytable_pool;
+ bthread_tag_t tag;
#if defined(__cplusplus)
void operator=(unsigned stacktype_and_flags) {
stack_type = (stacktype_and_flags & 7);
flags = (stacktype_and_flags & ~(unsigned)7u);
keytable_pool = NULL;
+ tag = BTHREAD_TAG_DEFAULT;
}
bthread_attr_t operator|(unsigned other_flags) const {
CHECK(!(other_flags & 7)) << "flags=" << other_flags;
@@ -116,24 +122,22 @@ typedef struct bthread_attr_t {
// obvious drawback is that you need more worker pthreads when you have a lot
// of such bthreads.
static const bthread_attr_t BTHREAD_ATTR_PTHREAD =
-{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL };
+{ BTHREAD_STACKTYPE_PTHREAD, 0, NULL, BTHREAD_TAG_DEFAULT };
// bthreads created with following attributes will have different size of
// stacks. Default is BTHREAD_ATTR_NORMAL.
-static const bthread_attr_t BTHREAD_ATTR_SMALL =
-{ BTHREAD_STACKTYPE_SMALL, 0, NULL };
-static const bthread_attr_t BTHREAD_ATTR_NORMAL =
-{ BTHREAD_STACKTYPE_NORMAL, 0, NULL };
-static const bthread_attr_t BTHREAD_ATTR_LARGE =
-{ BTHREAD_STACKTYPE_LARGE, 0, NULL };
+static const bthread_attr_t BTHREAD_ATTR_SMALL = {BTHREAD_STACKTYPE_SMALL, 0,
NULL,
+ BTHREAD_TAG_DEFAULT};
+static const bthread_attr_t BTHREAD_ATTR_NORMAL = {BTHREAD_STACKTYPE_NORMAL,
0, NULL,
+ BTHREAD_TAG_DEFAULT};
+static const bthread_attr_t BTHREAD_ATTR_LARGE = {BTHREAD_STACKTYPE_LARGE, 0,
NULL,
+ BTHREAD_TAG_DEFAULT};
// bthreads created with this attribute will print log when it's started,
// context-switched, finished.
static const bthread_attr_t BTHREAD_ATTR_DEBUG = {
- BTHREAD_STACKTYPE_NORMAL,
- BTHREAD_LOG_START_AND_FINISH | BTHREAD_LOG_CONTEXT_SWITCH,
- NULL
-};
+ BTHREAD_STACKTYPE_NORMAL, BTHREAD_LOG_START_AND_FINISH |
BTHREAD_LOG_CONTEXT_SWITCH, NULL,
+ BTHREAD_TAG_DEFAULT};
static const size_t BTHREAD_EPOLL_THREAD_NUM = 1;
static const bthread_t BTHREAD_ATOMIC_INIT = 0;
diff --git a/src/bthread/unstable.h b/src/bthread/unstable.h
index 61f4b1ab..5836f60d 100644
--- a/src/bthread/unstable.h
+++ b/src/bthread/unstable.h
@@ -84,6 +84,9 @@ extern int bthread_connect(int sockfd, const struct sockaddr*
serv_addr,
// Returns 0 on success, error code otherwise.
extern int bthread_set_worker_startfn(void (*start_fn)());
+// Add a startup function with tag
+extern int bthread_set_tagged_worker_startfn(void (*start_fn)(bthread_tag_t));
+
// Stop all bthread and worker pthreads.
// You should avoid calling this function which may cause bthread after main()
// suspend indefinitely.
diff --git a/test/bthread_setconcurrency_unittest.cpp
b/test/bthread_setconcurrency_unittest.cpp
index a16c2a70..e2be4ca7 100644
--- a/test/bthread_setconcurrency_unittest.cpp
+++ b/test/bthread_setconcurrency_unittest.cpp
@@ -191,4 +191,35 @@ TEST(BthreadTest, min_concurrency) {
ASSERT_EQ(conn + add_conn, bthread::g_task_control->concurrency());
}
+int current_tag(int tag) {
+ std::stringstream ss;
+ ss << tag;
+ std::string ret = GFLAGS_NS::SetCommandLineOption("bthread_current_tag",
ss.str().c_str());
+ return !(ret.empty());
+}
+
+TEST(BthreadTest, current_tag) {
+ ASSERT_EQ(false, current_tag(-1));
+ ASSERT_EQ(true, current_tag(0));
+ ASSERT_EQ(false, current_tag(1));
+}
+
+int concurrency_by_tag(int num) {
+ std::stringstream ss;
+ ss << num;
+ std::string ret =
+ GFLAGS_NS::SetCommandLineOption("bthread_concurrency_by_tag",
ss.str().c_str());
+ return !(ret.empty());
+}
+
+TEST(BthreadTest, concurrency_by_tag) {
+ ASSERT_EQ(concurrency_by_tag(1), true);
+ ASSERT_EQ(concurrency_by_tag(1), false);
+ auto con = bthread_getconcurrency_by_tag(0);
+ ASSERT_EQ(concurrency_by_tag(con), true);
+ ASSERT_EQ(concurrency_by_tag(con + 1), false);
+ bthread_setconcurrency(con + 1);
+ ASSERT_EQ(concurrency_by_tag(con + 1), true);
+}
+
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]