Huixxi commented on code in PR #149:
URL: https://github.com/apache/brpc-website/pull/149#discussion_r1208320331
##########
content/zh/docs/blogs/sourcecodes/bthread_schedule/index.md:
##########
@@ -0,0 +1,341 @@
+---
+title: "bRPC源码解析·bthread调度执行流程"
+linkTitle: "bRPC源码解析·bthread调度执行流程"
+weight: 3
+date: 2023-05-23
+---
+(作者简介:KIDGINBROOK,在昆仑芯参与训练框架开发工作)
+
+## 整体流程
+task_group负责对bthread的调度执行,一个task_group对应一个pthread,内部有两个执行队列,分别为_rq和_remote_rq,执行队列中存放着待执行的bthread。task_control为全局单例,内部有多个task_group。
+
+
+### 主要接口
+
+#### TaskControl
+TaskControl是一个单例,下面是初始化的过程,主要逻辑即为创建_concurrency个worker(bthread_worker)线程,每个worker执行worker_thread函数
+```c++
+int TaskControl::init(int concurrency) {
+ _concurrency = concurrency;
+ _workers.resize(_concurrency);
+ for (int i = 0; i < _concurrency; ++i) {
+ const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
+ ...
+ }
+ ...
+}
+```
+worker_thread的逻辑为通过create_group创建一个TaskGroup
g,添加到TaskControl中,设置tls_task_group为g,tls_task_group为tls变量,因此只有worker线程的tls_task_group为非null,然后执行TaskGroup的run_main_task函数
+```c++
+void* TaskControl::worker_thread(void* arg) {
+ TaskControl* c = static_cast<TaskControl*>(arg);
+ TaskGroup* g = c->create_group();
+ ...
+ tls_task_group = g;
+ c->_nworkers << 1;
+ g->run_main_task();
+ ...
+}
+```
Review Comment:
这里感觉可以加上TaskGroup* TaskControl::create_group() {
...
if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
LOG(ERROR) << "Fail to init TaskGroup"
...
}
和下面的TaskGroup的初始化串联起来
##########
content/zh/docs/blogs/sourcecodes/bthread_schedule/index.md:
##########
@@ -0,0 +1,341 @@
+---
+title: "bRPC源码解析·bthread调度执行流程"
+linkTitle: "bRPC源码解析·bthread调度执行流程"
+weight: 3
+date: 2023-05-23
+---
+(作者简介:KIDGINBROOK,在昆仑芯参与训练框架开发工作)
+
+## 整体流程
+task_group负责对bthread的调度执行,一个task_group对应一个pthread,内部有两个执行队列,分别为_rq和_remote_rq,执行队列中存放着待执行的bthread。task_control为全局单例,内部有多个task_group。
+
+
+### 主要接口
+
+#### TaskControl
+TaskControl是一个单例,下面是初始化的过程,主要逻辑即为创建_concurrency个worker(bthread_worker)线程,每个worker执行worker_thread函数
+```c++
+int TaskControl::init(int concurrency) {
+ _concurrency = concurrency;
+ _workers.resize(_concurrency);
+ for (int i = 0; i < _concurrency; ++i) {
+ const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
+ ...
+ }
+ ...
+}
+```
+worker_thread的逻辑为通过create_group创建一个TaskGroup
g,添加到TaskControl中,设置tls_task_group为g,tls_task_group为tls变量,因此只有worker线程的tls_task_group为非null,然后执行TaskGroup的run_main_task函数
+```c++
+void* TaskControl::worker_thread(void* arg) {
+ TaskControl* c = static_cast<TaskControl*>(arg);
+ TaskGroup* g = c->create_group();
+ ...
+ tls_task_group = g;
+ c->_nworkers << 1;
+ g->run_main_task();
+ ...
+}
+```
+
+#### TaskGroup
+TaskGroup对应一个pthread,初始化函数如下,创建rq和remote_rq,都是负责存放待执行bthread的队列,然后创建main_stack和main_tid,main_tid代表主流程对应的bthread
id,后面会具体讲main_stack和main_tid的作用。TaskMeta为一个bthread的meta信息,如执行函数,参数,local
storage等,这里会将cur_meta设置为main_tid对应的TaskMeta。
+
+```c++
+int TaskGroup::init(size_t runqueue_capacity) {
+ _rq.init(runqueue_capacity);
+ _remote_rq.init(runqueue_capacity / 2);
+ ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
+ ...
+ butil::ResourceId<TaskMeta> slot;
+ TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
+ ...
+ m->stop = false;
+ m->interrupted = false;
+ m->about_to_quit = false;
+ m->fn = NULL;
+ m->arg = NULL;
+ m->local_storage = LOCAL_STORAGE_INIT;
+ m->cpuwide_start_ns = butil::cpuwide_time_ns();
+ m->stat = EMPTY_STAT;
+ m->attr = BTHREAD_ATTR_TASKGROUP;
+ m->tid = make_tid(*m->version_butex, slot);
+ m->set_stack(stk);
+
+ _cur_meta = m;
+ _main_tid = m->tid;
+ _main_stack = stk;
+ _last_run_ns = butil::cpuwide_time_ns();
+ return 0;
+}
+
+```
+
+每个worker会一直在while循环中,如果有可执行的bthread,wait_task会返回对应bthread的tid,否则当前worker会阻塞;wait_task(???)的具体逻辑是先去当前task_group的_remote_rq中pop,如果没有,则去其他的task_group的_rq和_remote_rq中pop。
+```c++
+void TaskGroup::run_main_task() {
+ bvar::PassiveStatus<double> cumulated_cputime(
+ get_cumulated_cputime_from_this, this);
+ std::unique_ptr<bvar::PerSecond<bvar::PassiveStatus<double> > > usage_bvar;
+
+ TaskGroup* dummy = this;
+ bthread_t tid;
+ while (wait_task(&tid)) {
+ TaskGroup::sched_to(&dummy, tid);
+ DCHECK_EQ(this, dummy);
+ DCHECK_EQ(_cur_meta->stack, _main_stack);
+ if (_cur_meta->tid != _main_tid) {
+ TaskGroup::task_runner(1/*skip remained*/);
+ }
+ }
+}
+```
+
+当拿到可执行的tid后,调用sched_to,首先通过tid拿到该tid对应的TaskMeta,如果已经为该meta分配过栈,则调用sched_to(pg,
next_meta),该函数的主要逻辑为通过jump_stack(cur_meta->stack,
next_meta->stack)跳转至next_meta;否则通过get_stack分配一个新栈,并设置该栈的执行入口为task_runner函数。
+
+```c++
+inline void TaskGroup::sched_to(TaskGroup** pg, bthread_t next_tid) {
+ TaskMeta* next_meta = address_meta(next_tid);
+ if (next_meta->stack == NULL) {
+ ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
+ if (stk) {
+ next_meta->set_stack(stk);
+ } else {
+ // stack_type is BTHREAD_STACKTYPE_PTHREAD or out of memory,
+ // In latter case, attr is forced to be BTHREAD_STACKTYPE_PTHREAD.
+ // This basically means that if we can't allocate stack, run
+ // the task in pthread directly.
+ next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
+ next_meta->set_stack((*pg)->_main_stack);
+ }
+ }
+ // Update now_ns only when wait_task did yield.
+ sched_to(pg, next_meta);
+}
+
+```
+task_runner核心如下,首先执行remain函数,remain为一个bthread在开始运行自己逻辑前需要做的一些工作,后面会看到;然后执行该meta的函数,因为函数执行过程中该bth可能会调度至其他worker,因此task_group可能发生改变,所以执行完成后重新对g进行设置;最后调用ending_sched。
Review Comment:
这个remain的作用是切换bthread的时候保存上下文么?
##########
content/zh/docs/blogs/sourcecodes/bthread_schedule/index.md:
##########
@@ -0,0 +1,341 @@
+---
+title: "bRPC源码解析·bthread调度执行流程"
+linkTitle: "bRPC源码解析·bthread调度执行流程"
+weight: 3
+date: 2023-05-23
+---
+(作者简介:KIDGINBROOK,在昆仑芯参与训练框架开发工作)
+
+## 整体流程
+task_group负责对bthread的调度执行,一个task_group对应一个pthread,内部有两个执行队列,分别为_rq和_remote_rq,执行队列中存放着待执行的bthread。task_control为全局单例,内部有多个task_group。
+
+
+### 主要接口
+
+#### TaskControl
+TaskControl是一个单例,下面是初始化的过程,主要逻辑即为创建_concurrency个worker(bthread_worker)线程,每个worker执行worker_thread函数
+```c++
+int TaskControl::init(int concurrency) {
+ _concurrency = concurrency;
+ _workers.resize(_concurrency);
+ for (int i = 0; i < _concurrency; ++i) {
+ const int rc = pthread_create(&_workers[i], NULL, worker_thread, this);
+ ...
+ }
+ ...
+}
+```
+worker_thread的逻辑为通过create_group创建一个TaskGroup
g,添加到TaskControl中,设置tls_task_group为g,tls_task_group为tls变量,因此只有worker线程的tls_task_group为非null,然后执行TaskGroup的run_main_task函数
+```c++
+void* TaskControl::worker_thread(void* arg) {
+ TaskControl* c = static_cast<TaskControl*>(arg);
+ TaskGroup* g = c->create_group();
+ ...
+ tls_task_group = g;
+ c->_nworkers << 1;
+ g->run_main_task();
+ ...
+}
+```
+
+#### TaskGroup
+TaskGroup对应一个pthread,初始化函数如下,创建rq和remote_rq,都是负责存放待执行bthread的队列,然后创建main_stack和main_tid,main_tid代表主流程对应的bthread
id,后面会具体讲main_stack和main_tid的作用。TaskMeta为一个bthread的meta信息,如执行函数,参数,local
storage等,这里会将cur_meta设置为main_tid对应的TaskMeta。
+
+```c++
+int TaskGroup::init(size_t runqueue_capacity) {
+ _rq.init(runqueue_capacity);
+ _remote_rq.init(runqueue_capacity / 2);
+ ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
+ ...
+ butil::ResourceId<TaskMeta> slot;
+ TaskMeta* m = butil::get_resource<TaskMeta>(&slot);
+ ...
+ m->stop = false;
+ m->interrupted = false;
+ m->about_to_quit = false;
+ m->fn = NULL;
+ m->arg = NULL;
+ m->local_storage = LOCAL_STORAGE_INIT;
+ m->cpuwide_start_ns = butil::cpuwide_time_ns();
+ m->stat = EMPTY_STAT;
+ m->attr = BTHREAD_ATTR_TASKGROUP;
+ m->tid = make_tid(*m->version_butex, slot);
+ m->set_stack(stk);
+
+ _cur_meta = m;
+ _main_tid = m->tid;
+ _main_stack = stk;
+ _last_run_ns = butil::cpuwide_time_ns();
+ return 0;
+}
+
+```
+
+每个worker会一直在while循环中,如果有可执行的bthread,wait_task会返回对应bthread的tid,否则当前worker会阻塞;wait_task(???)的具体逻辑是先去当前task_group的_remote_rq中pop,如果没有,则去其他的task_group的_rq和_remote_rq中pop。
Review Comment:
这三个?是有意这么写的么,诶?这里当前task_group的rq是做什么用的 不应该先去_rq中pop么?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]