This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9958eb91a3f branch-2.1: [fix](compaction) fix compaction producer hold
for permits leak #45664 (#46753)
9958eb91a3f is described below
commit 9958eb91a3f18c5ec0d04269291d8209ecaff17e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 10 15:53:01 2025 +0800
branch-2.1: [fix](compaction) fix compaction producer hold for permits leak
#45664 (#46753)
Cherry-picked from #45664
Co-authored-by: shee <[email protected]>
Co-authored-by: garenshi <[email protected]>
Co-authored-by: camby <[email protected]>
---
be/src/olap/olap_server.cpp | 36 ++++++++++++++++--------------------
1 file changed, 16 insertions(+), 20 deletions(-)
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 71c8fb38681..014213c4694 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -633,20 +633,6 @@ void StorageEngine::_compaction_tasks_producer_callback() {
last_base_score_update_time = cur_time;
}
}
- std::unique_ptr<ThreadPool>& thread_pool =
- (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
- ? _cumu_compaction_thread_pool
- : _base_compaction_thread_pool;
- VLOG_CRITICAL << "compaction thread pool. type: "
- << (compaction_type ==
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
-
: "BASE")
- << ", num_threads: " << thread_pool->num_threads()
- << ", num_threads_pending_start: "
- << thread_pool->num_threads_pending_start()
- << ", num_active_threads: " <<
thread_pool->num_active_threads()
- << ", max_threads: " << thread_pool->max_threads()
- << ", min_threads: " << thread_pool->min_threads()
- << ", num_total_queued_tasks: " <<
thread_pool->get_queue_size();
std::vector<TabletSharedPtr> tablets_compaction =
_generate_compaction_tasks(compaction_type, data_dirs,
check_score);
if (tablets_compaction.size() == 0) {
@@ -1043,23 +1029,33 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
(compaction_type == CompactionType::CUMULATIVE_COMPACTION)
? _cumu_compaction_thread_pool
: _base_compaction_thread_pool;
+ VLOG_CRITICAL << "compaction thread pool. type: "
+ << (compaction_type ==
CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+
: "BASE")
+ << ", num_threads: " << thread_pool->num_threads()
+ << ", num_threads_pending_start: " <<
thread_pool->num_threads_pending_start()
+ << ", num_active_threads: " <<
thread_pool->num_active_threads()
+ << ", max_threads: " << thread_pool->max_threads()
+ << ", min_threads: " << thread_pool->min_threads()
+ << ", num_total_queued_tasks: " <<
thread_pool->get_queue_size();
auto st = thread_pool->submit_func([tablet, compaction =
std::move(compaction),
compaction_type, permits, force,
this]() {
+ Defer defer {[&]() {
+ if (!force) {
+ _permit_limiter.release(permits);
+ }
+ _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+ tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
+ }};
if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(),
compaction_type)) {
LOG(INFO) << "Tablet state has been changed, no need to begin
this compaction "
"task, tablet_id="
<< tablet->tablet_id() << ", tablet_state=" <<
tablet->tablet_state();
- _pop_tablet_from_submitted_compaction(tablet, compaction_type);
return;
}
tablet->compaction_stage = CompactionStage::EXECUTING;
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
tablet->execute_compaction(*compaction);
- if (!force) {
- _permit_limiter.release(permits);
- }
- _pop_tablet_from_submitted_compaction(tablet, compaction_type);
- tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
});
if (!st.ok()) {
if (!force) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]