gydong commented on issue #1243:
URL: https://github.com/apache/incubator-brpc/issues/1243#issuecomment-693776704
我们内部有个类似实现,仅供参考:
```cpp
// This class is NOT thread safe.
class BthreadGroup {
public:
class Task;
BthreadGroup();
~BthreadGroup();
// Always return true if call it before "Start()", return false otherwise.
bool AddOneTask(std::unique_ptr<Task>&& task);
void Start(int timeout_us = 0);
// The signature of TCallable is " void (Task*)".
// Note: if u start the group with a timeout, then use this function to
// retrieve your tasks, or there is a race condition.
template <typename TCallable>
void ForeachFinishedTask(const TCallable& func);
private:
struct SharedPart;
void StartBthread();
// Ihis just set a "stop flag" simply, and the user logic should stop
// according to Task::ShouldStop().
void NotifyStop();
bool is_started_ = false;
std::shared_ptr<SharedPart> sp_;
std::vector<std::unique_ptr<Task>> tasks_;
};
class BthreadGroup::Task {
public:
Task();
virtual ~Task();
virtual void Run() = 0;
protected:
// User should stop the task if this function return true. User can also
// ignore this function, and then this task will stop and delete itself.
bool ShouldStop() const;
private:
friend class BthreadGroup;
static void* InternalRun(void* data);
std::atomic<bool> is_finished_{false};
std::atomic<bool> should_stop_{false};
bthread_t bthread_ = INVALID_BTHREAD;
std::weak_ptr<BthreadGroup::SharedPart> sp_;
};
template <typename TCallable>
void BthreadGroup::ForeachFinishedTask(const TCallable& func) {
for (std::unique_ptr<Task>& task : tasks_) {
if (task && task->is_finished_.exchange(true,
std::memory_order_seq_cst)) {
func(task.get());
} else {
task.release();
}
}
}
```
```cpp
namespace {
bvar::Adder<int> g_bthread_groups("bthread_group_count");
bvar::Adder<int>
g_bthread_group_timeout_count("bthread_group_timeout_count");
bvar::Adder<int> g_bthread_group_tasks("bthread_group_task_count");
} // namespace
struct BthreadGroup::SharedPart {
SharedPart() : ce(0) { }
bthread::CountdownEvent ce;
};
BthreadGroup::BthreadGroup() : sp_(std::make_shared<SharedPart>()) {
g_bthread_groups << 1;
}
bool BthreadGroup::AddOneTask(std::unique_ptr<Task>&& task) {
if (is_started_) {
return false;
}
task->sp_ = sp_;
tasks_.emplace_back(std::move(task));
sp_->ce.add_count();
return true;
}
void BthreadGroup::Start(int timeout_us) {
StartBthread();
is_started_ = true;
if (timeout_us > 0) {
const timespec end_time = butil::microseconds_from_now(timeout_us);
const int status = sp_->ce.timed_wait(end_time);
if (status == ETIMEDOUT) {
g_bthread_group_timeout_count << 1;
} else if (status != 0) {
LOG(ERROR) << "bthread group is stopped du to: " << berror(status);
}
} else {
const int status = sp_->ce.wait();
if (status != 0) {
LOG(ERROR) << "bthread group is stopped du to: " << berror(status);
}
}
NotifyStop();
}
void BthreadGroup::StartBthread() {
if (tasks_.empty()) {
return;
}
for (size_t i = 0; i < tasks_.size() - 1; ++i) {
CHECK_EQ(0, bthread_start_background(
&tasks_[i]->bthread_, nullptr, Task::InternalRun,
tasks_[i].get()))
<< "Failed to start a bthread, error: " << berror();
}
CHECK_EQ(0, bthread_start_urgent(&tasks_.back()->bthread_,
nullptr,
Task::InternalRun,
tasks_.back().get()))
<< "Failed to start a bthread, error: " << berror();
}
void BthreadGroup::NotifyStop() {
for (const std::unique_ptr<Task>& task : tasks_) {
task->should_stop_.store(true, std::memory_order_release);
}
}
BthreadGroup::~BthreadGroup() {
// Ensure the task thread aware of the finish state.
ForeachFinishedTask([](Task* dummy){});
g_bthread_groups << -1;
}
BthreadGroup::Task::Task() {
g_bthread_group_tasks << 1;
}
BthreadGroup::Task::~Task() {
g_bthread_group_tasks << -1;
}
bool BthreadGroup::Task::ShouldStop() const {
return should_stop_.load(std::memory_order_acquire) ||
1 == bthread_stopped(bthread_self());
}
void* BthreadGroup::Task::InternalRun(void* data) {
Task* task = static_cast<Task*>(data);
task->Run();
if (task->is_finished_.exchange(true, std::memory_order_seq_cst)) {
delete task;
} else if (std::shared_ptr<SharedPart> sp = task->sp_.lock()) {
sp->ce.signal();
}
return nullptr;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]