wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1190831296


##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {

Review Comment:
   UserCodeTask



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {
+    void (*fn)(void*);
+    void* arg;
+    bthread::LocalStorage tls_bls;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadRandomAssignPolicy() {}
+    virtual ~UserCodeThreadRandomAssignPolicy() {}
+    size_t Index(void* arg, size_t range) override;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy);
+};
+
+class UserCodeThreadPool;
+class UserCodeThreadWorker {
+public:
+    UserCodeThreadWorker(UserCodeThreadPool* pool);
+    void UserCodeRun(UserCode&& usercode);
+    void UserCodeLoop();
+    void Start();
+    void Stop();
+    void Join();
+
+private:
+    UserCodeThreadPool* _pool;
+    std::deque<UserCode> _queue;
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::thread _worker;
+    std::atomic<bool> _running;               // running flag
+    static std::atomic<int> _next_worker_id;  // worker id
+};
+// "user code thread pool" configuration
+struct UserCodeThreadPoolConf {
+    UserCodeThreadPoolConf(const std::string& pool_name,
+                           const std::string& num_threads,
+                           const std::function<void()>& startfn,
+                           UserCodeThreadAssignPolicy* policy)
+        : pool_name(pool_name),
+          num_threads(num_threads),
+          thread_startfn(startfn),
+          assign_policy(policy) {}
+    std::string pool_name;                      // pool name
+    std::string num_threads;                    // thread number
+    std::function<void()> thread_startfn;       // thread start function
+    UserCodeThreadAssignPolicy* assign_policy;  // thread assign policy
+};
+// "user code thread pool" is a set of pthreads to allow run user code in this
+// pool for some methods
+class UserCodeThreadPool {
+    static double GetInPoolElapseInSecond(void*);

Review Comment:
   private的成员是不是放在public后面



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);
+    if (ret) {
+        if (info.type != "int32") {
+            LOG(ERROR) << "Failed gflags type need int32";
+            return false;
+        }
+        _gflag_num_threads = static_cast<const int32_t*>(info.flag_ptr);
+        num = *_gflag_num_threads;
+    } else {
+        num = std::atoi(num_threads.c_str());
+    }
+    if (num <= 0) {
+        LOG(ERROR) << "Failed parameter for usercode pool init";
+        return false;
+    }
+    SetNumThreads(num);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    if (_gflag_num_threads != nullptr) {
+        size_t num_threads = *_gflag_num_threads;
+        if (num_threads != _workers.size()) {
+            SetNumThreads(num_threads);
+        }
+    }
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index];

Review Comment:
   是不是得判断一下index有没有越界



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;

Review Comment:
   这个可能有坑,tls_bls里面的对象有可能在bthread worker里释放了,但是usercode worker里还在用



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {
+    void (*fn)(void*);
+    void* arg;
+    bthread::LocalStorage tls_bls;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadRandomAssignPolicy() {}
+    virtual ~UserCodeThreadRandomAssignPolicy() {}
+    size_t Index(void* arg, size_t range) override;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy);
+};
+
+class UserCodeThreadPool;
+class UserCodeThreadWorker {
+public:
+    UserCodeThreadWorker(UserCodeThreadPool* pool);
+    void UserCodeRun(UserCode&& usercode);
+    void UserCodeLoop();
+    void Start();
+    void Stop();
+    void Join();
+
+private:
+    UserCodeThreadPool* _pool;
+    std::deque<UserCode> _queue;
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::thread _worker;
+    std::atomic<bool> _running;               // running flag
+    static std::atomic<int> _next_worker_id;  // worker id
+};
+// "user code thread pool" configuration
+struct UserCodeThreadPoolConf {
+    UserCodeThreadPoolConf(const std::string& pool_name,
+                           const std::string& num_threads,
+                           const std::function<void()>& startfn,
+                           UserCodeThreadAssignPolicy* policy)
+        : pool_name(pool_name),
+          num_threads(num_threads),
+          thread_startfn(startfn),
+          assign_policy(policy) {}
+    std::string pool_name;                      // pool name
+    std::string num_threads;                    // thread number
+    std::function<void()> thread_startfn;       // thread start function
+    UserCodeThreadAssignPolicy* assign_policy;  // thread assign policy
+};
+// "user code thread pool" is a set of pthreads to allow run user code in this
+// pool for some methods
+class UserCodeThreadPool {
+    static double GetInPoolElapseInSecond(void*);
+    static size_t GetUserCodeThreadSize(void*);
+    void StopAndJoin();
+    void SetNumThreads(size_t);
+    size_t GetNumThreads();
+
+public:
+    bvar::Adder<size_t> inpool_count;

Review Comment:
   这些成员需要是public吗



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;

Review Comment:
   这个需要暴露在头文件里吗,放在cpp里就可以吧



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   感觉这里不要耦合gflag吧,直接传一个int进来是不是就可以



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to