This is an automated email from the ASF dual-hosted git repository.
hello-stephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4a0c58bcbcd [fix](cloud) Drain txn lazy committer workers before
destruction (#63876)
4a0c58bcbcd is described below
commit 4a0c58bcbcdb2f257f64ef2a856d39d0186bf7cd
Author: Gavin Chou <[email protected]>
AuthorDate: Mon Jun 1 10:10:37 2026 +0800
[fix](cloud) Drain txn lazy committer workers before destruction (#63876)
## What
Fix shutdown ordering in `TxnLazyCommitter` by explicitly stopping
worker pools before member destruction can invalidate state used by
worker callbacks.
## Why
Lazy commit worker jobs keep a back pointer to `TxnLazyCommitter` and
call back into `remove()`. They can also access the parallel commit pool
and resource manager during `commit()`. With the default destructor,
`running_tasks_`, `mutex_`, and `parallel_commit_pool_` are destroyed
before `worker_pool_` is joined, which can lead to shutdown-time
use-after-destruction.
## How
- Add an explicit `TxnLazyCommitter` destructor.
- Mark the committer as stopped before draining workers.
- Stop and join the lazy commit worker pool before destroying task
tracking state.
- Stop the parallel commit pool after lazy workers are quiesced.
- Make failed or post-shutdown submissions complete with an error
instead of leaving waiters blocked.
## Tests
- `sh format_code.sh cloud/src/meta-service/txn_lazy_committer.h`
- `sh format_code.sh cloud/src/meta-service/txn_lazy_committer.cpp`
- `sh run-cloud-ut.sh --run --fdb
"fdb_cluster0:[email protected]:4500"`
- Build passed.
- `txn_lazy_commit_test` passed 24/24 in the full run.
- The full run had unrelated storage vault/HDFS failures in
`meta_service_test`.
- After tightening the submit/shutdown race:
- `sh run-cloud-ut.sh --run --fdb
"fdb_cluster0:[email protected]:4500" --filter
"txn_lazy_commit_test:*.*"`
- Build passed; 22/24 passed, 2 tests failed due FDB `Timeout` while
committing setup transactions.
Co-authored-by: gavinchou <[email protected]>
---
cloud/src/meta-service/txn_lazy_committer.cpp | 57 +++++++++++++++++++++------
cloud/src/meta-service/txn_lazy_committer.h | 9 ++++-
2 files changed, 54 insertions(+), 12 deletions(-)
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 34d764a1324..308dfd2fc4d 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -633,14 +633,20 @@ TxnLazyCommitTask::TxnLazyCommitTask(const std::string&
instance_id, int64_t txn
DCHECK(txn_id > 0);
}
+void TxnLazyCommitTask::finish(MetaServiceCode code, std::string msg) {
+ {
+ std::unique_lock lock(mutex_);
+ finished_ = true;
+ code_ = code;
+ msg_ = std::move(msg);
+ }
+ cond_.notify_all();
+}
+
void TxnLazyCommitTask::commit() {
StopWatch sw;
DORIS_CLOUD_DEFER {
- {
- std::unique_lock lock(mutex_);
- this->finished_ = true;
- }
- this->cond_.notify_all();
+ finish(code_, msg_);
g_bvar_txn_lazy_committer_committing_duration << sw.elapsed_us();
};
@@ -965,6 +971,25 @@ TxnLazyCommitter::TxnLazyCommitter(std::shared_ptr<TxnKv>
txn_kv,
parallel_commit_pool_->start();
}
+TxnLazyCommitter::~TxnLazyCommitter() {
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ stopped_ = true;
+ }
+
+ if (worker_pool_ != nullptr) {
+ worker_pool_->stop();
+ }
+ if (parallel_commit_pool_ != nullptr) {
+ parallel_commit_pool_->stop();
+ }
+
+ {
+ std::unique_lock<std::mutex> lock(mutex_);
+ running_tasks_.clear();
+ }
+}
+
/**
* @brief Submit a lazy commit txn task
*
@@ -978,6 +1003,12 @@ std::shared_ptr<TxnLazyCommitTask>
TxnLazyCommitter::submit(const std::string& i
std::shared_ptr<TxnLazyCommitTask> task;
{
std::unique_lock<std::mutex> lock(mutex_);
+ if (stopped_) {
+ task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id,
txn_kv_, this);
+ task->finish(MetaServiceCode::UNDEFINED_ERR, "txn lazy committer
is stopped");
+ return task;
+ }
+
auto iter = running_tasks_.find(txn_id);
if (iter != running_tasks_.end()) {
return iter->second;
@@ -986,13 +1017,17 @@ std::shared_ptr<TxnLazyCommitTask>
TxnLazyCommitter::submit(const std::string& i
task = std::make_shared<TxnLazyCommitTask>(instance_id, txn_id,
txn_kv_, this);
running_tasks_.emplace(txn_id, task);
g_bvar_txn_lazy_committer_submitted << 1;
- }
- worker_pool_->submit([task]() {
- task->commit();
- task->txn_lazy_committer_->remove(task->txn_id_);
- g_bvar_txn_lazy_committer_finished << 1;
- });
+ int ret = worker_pool_->submit([task]() {
+ task->commit();
+ task->txn_lazy_committer_->remove(task->txn_id_);
+ g_bvar_txn_lazy_committer_finished << 1;
+ });
+ if (ret != 0) {
+ running_tasks_.erase(txn_id);
+ task->finish(MetaServiceCode::UNDEFINED_ERR, "failed to submit txn
lazy commit task");
+ }
+ }
DCHECK(task != nullptr);
return task;
}
diff --git a/cloud/src/meta-service/txn_lazy_committer.h
b/cloud/src/meta-service/txn_lazy_committer.h
index 048ff7cf308..0ac6591029c 100644
--- a/cloud/src/meta-service/txn_lazy_committer.h
+++ b/cloud/src/meta-service/txn_lazy_committer.h
@@ -45,6 +45,11 @@ public:
private:
friend class TxnLazyCommitter;
+ // Marks the task as finished with the final result and wakes all waiters.
+ // `code` is returned by wait() as the task status, and `msg` carries the
+ // corresponding error detail or an empty string on success.
+ void finish(MetaServiceCode code, std::string msg);
+
std::pair<MetaServiceCode, std::string> commit_partition(
int64_t db_id, int64_t partition_id,
const std::vector<std::pair<std::string,
doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
@@ -66,6 +71,7 @@ class TxnLazyCommitter {
public:
TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv);
TxnLazyCommitter(std::shared_ptr<TxnKv> txn_kv,
std::shared_ptr<ResourceManager> resource_mgr);
+ ~TxnLazyCommitter();
std::shared_ptr<TxnLazyCommitTask> submit(const std::string& instance_id,
int64_t txn_id);
void remove(int64_t txn_id);
@@ -82,5 +88,6 @@ private:
std::mutex mutex_;
// <txn_id, TxnLazyCommitTask>
std::unordered_map<int64_t, std::shared_ptr<TxnLazyCommitTask>>
running_tasks_;
+ bool stopped_ = false;
};
-} // namespace doris::cloud
\ No newline at end of file
+} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]