This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new db0dbda5f6 [fix](merge-on-write) fix dead lock when publish (#21339)
(#24209)
db0dbda5f6 is described below
commit db0dbda5f6196f5972fb032234a78b8bba1ece1c
Author: Xin Liao <[email protected]>
AuthorDate: Tue Sep 12 11:07:19 2023 +0800
[fix](merge-on-write) fix dead lock when publish (#21339) (#24209)
---
be/src/olap/task/engine_publish_version_task.cpp | 35 ++++--------------------
be/src/olap/task/engine_publish_version_task.h | 7 -----
2 files changed, 6 insertions(+), 36 deletions(-)
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index eff14dfdb0..c793ddbf6c 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -34,8 +34,7 @@ using std::map;
EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest&
publish_version_req,
std::vector<TTabletId>*
error_tablet_ids,
std::vector<TTabletId>*
succ_tablet_ids)
- : _total_task_num(0),
- _publish_version_req(publish_version_req),
+ : _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}
@@ -49,25 +48,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t
tablet_id) {
_succ_tablet_ids->push_back(tablet_id);
}
-void EnginePublishVersionTask::wait() {
- std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
- _tablet_finish_cond.wait(lock);
-}
-
-void EnginePublishVersionTask::notify() {
- std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
- _tablet_finish_cond.notify_one();
-}
-
-int64_t EnginePublishVersionTask::finish_task() {
- return _total_task_num.fetch_sub(1);
-}
-
Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" <<
transaction_id;
+ std::unique_ptr<ThreadPoolToken> token =
+
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
+ ThreadPool::ExecutionMode::CONCURRENT);
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -145,19 +133,13 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
- _total_task_num.fetch_add(1);
auto tablet_publish_txn_ptr =
std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id,
version, tablet_info);
- auto submit_st =
-
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
- [=]() { tablet_publish_txn_ptr->handle(); });
+ auto submit_st = token->submit_func([=]() {
tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok()) << submit_st;
}
}
- // wait for all publish txn finished
- while (_total_task_num.load() != 0) {
- wait();
- }
+ token->wait();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
@@ -209,11 +191,6 @@
TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
_tablet_info(tablet_info) {}
void TabletPublishTxnTask::handle() {
- Defer defer {[&] {
- if (_engine_publish_version_task->finish_task() == 1) {
- _engine_publish_version_task->notify();
- }
- }};
auto publish_status =
StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version);
if (publish_status != Status::OK()) {
diff --git a/be/src/olap/task/engine_publish_version_task.h
b/be/src/olap/task/engine_publish_version_task.h
index 0a0de3efc0..0a918ba402 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -57,20 +57,13 @@ public:
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);
- void notify();
- void wait();
-
int64_t finish_task();
private:
- std::atomic<int64_t> _total_task_num;
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
-
- std::mutex _tablet_finish_mutex;
- std::condition_variable _tablet_finish_cond;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]