This is an automated email from the ASF dual-hosted git repository.

airborne pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26aacf428d2 [opt](ann index) omp_threads_limit working better & minor 
change for observability (#56796)
26aacf428d2 is described below

commit 26aacf428d2dfa2bb2ca38309e3de0dd708057ab
Author: zhiqiang <[email protected]>
AuthorDate: Sat Oct 18 16:37:56 2025 +0800

    [opt](ann index) omp_threads_limit working better & minor change for 
observability (#56796)
    
    This pull request introduces a new mechanism for managing OpenMP thread
    usage during concurrent FAISS vector index builds in Doris, improving
    resource control and stability. It adds a global thread budget guard to
    ensure that the total number of threads used does not exceed a
    configurable limit, and provides metrics for monitoring thread usage.
    Additionally, thread naming is temporarily set for easier debugging
    during index build phases.
---
 be/src/common/config.cpp                           | 19 +++--
 be/src/common/config.h                             |  3 +-
 .../segment_v2/ann_index/faiss_ann_index.cpp       | 90 ++++++++++++++++++++--
 be/src/util/doris_metrics.cpp                      |  2 +
 be/src/util/doris_metrics.h                        |  1 +
 5 files changed, 103 insertions(+), 12 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c2ce46bb06e..32683dc132e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1585,11 +1585,20 @@ 
DEFINE_mBool(enable_auto_clone_on_mow_publish_missing_version, "false");
 // The maximum csv line reader output buffer size
 DEFINE_mInt64(max_csv_line_reader_output_buffer_size, "4294967296");
 
-// Maximum number of openmp threads can be used by each doris threads.
-// This configuration controls the parallelism level for OpenMP operations 
within Doris,
-// helping to prevent resource contention and ensure stable performance when 
multiple
-// Doris threads are executing OpenMP-accelerated operations simultaneously.
-DEFINE_mInt32(omp_threads_limit, "8");
+// Maximum number of OpenMP threads allowed for concurrent vector index builds.
+// -1 means auto: use 80% of the available CPU cores.
+DEFINE_Int32(omp_threads_limit, "-1");
+DEFINE_Validator(omp_threads_limit, [](const int config) -> bool {
+    CpuInfo::init();
+    int core_cap = config::num_cores > 0 ? config::num_cores : 
CpuInfo::num_cores();
+    core_cap = std::max(1, core_cap);
+    int limit = config;
+    if (limit < 0) {
+        limit = std::max(1, core_cap * 4 / 5);
+    }
+    omp_threads_limit = std::max(1, std::min(limit, core_cap));
+    return true;
+});
 // The capacity of segment partial column cache, used to cache column readers 
for each segment.
 DEFINE_mInt32(max_segment_partial_column_cache_size, "100");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index df83ae46909..afba63bebe5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1642,7 +1642,8 @@ DECLARE_String(fuzzy_test_type);
 // The maximum csv line reader output buffer size
 DECLARE_mInt64(max_csv_line_reader_output_buffer_size);
 
-// Maximum number of OpenMP threads that can be used by each Doris thread
+// Maximum number of OpenMP threads available for concurrent index builds.
+// -1 means auto: use 80% of detected CPU cores.
 DECLARE_Int32(omp_threads_limit);
 // The capacity of segment partial column cache, used to cache column readers 
for each segment.
 DECLARE_mInt32(max_segment_partial_column_cache_size);
diff --git a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp 
b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
index aacbd183beb..7c9d9bb22d0 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
@@ -19,12 +19,16 @@
 
 #include <faiss/index_io.h>
 #include <omp.h>
+#include <pthread.h>
 
+#include <algorithm>
 #include <cmath>
+#include <condition_variable>
 #include <cstddef>
 #include <cstdint>
 #include <limits>
 #include <memory>
+#include <mutex>
 #include <string>
 
 #include "CLucene/store/IndexInput.h"
@@ -42,11 +46,77 @@
 #include "olap/rowset/segment_v2/ann_index/ann_index_files.h"
 #include "olap/rowset/segment_v2/ann_index/ann_search_params.h"
 #include "util/doris_metrics.h"
+#include "util/thread.h"
 #include "util/time.h"
 #include "vec/core/types.h"
 
 namespace doris::segment_v2 {
 #include "common/compile_check_begin.h"
+
+namespace {
+
+std::mutex g_omp_thread_mutex;
+int g_index_threads_in_use = 0;
+
+// Guard that ensures the total OpenMP threads used by concurrent index builds
+// never exceed the configured omp_threads_limit.
+class ScopedOmpThreadBudget {
+public:
+    // For each index build, reserve at most half of the remaining threads, at 
least 1 thread.
+    ScopedOmpThreadBudget() {
+        std::unique_lock<std::mutex> lock(g_omp_thread_mutex);
+        auto thread_cap = config::omp_threads_limit - g_index_threads_in_use;
+        _reserved_threads = std::max(1, thread_cap / 2);
+        g_index_threads_in_use += _reserved_threads;
+        
DorisMetrics::instance()->ann_index_build_index_threads->increment(_reserved_threads);
+        omp_set_num_threads(_reserved_threads);
+        VLOG_DEBUG << fmt::format(
+                "ScopedOmpThreadBudget reserve threads reserved={}, in_use={}, 
limit={}",
+                _reserved_threads, g_index_threads_in_use, 
config::omp_threads_limit);
+    }
+
+    ~ScopedOmpThreadBudget() {
+        std::lock_guard<std::mutex> lock(g_omp_thread_mutex);
+        g_index_threads_in_use -= _reserved_threads;
+        
DorisMetrics::instance()->ann_index_build_index_threads->increment(-_reserved_threads);
+        if (g_index_threads_in_use < 0) {
+            g_index_threads_in_use = 0;
+        }
+        VLOG_DEBUG << fmt::format(
+                "ScopedOmpThreadBudget release threads reserved={}, 
remaining_in_use={}, limit={}",
+                _reserved_threads, g_index_threads_in_use, 
config::omp_threads_limit);
+    }
+
+private:
+    int _reserved_threads = 1;
+};
+
+// Temporarily rename the current thread so FAISS build phases are easier to 
spot in debuggers.
+class ScopedThreadName {
+public:
+    explicit ScopedThreadName(const std::string& new_name) {
+        // POSIX limits thread names to 15 visible chars plus the null 
terminator.
+        char current_name[16] = {0};
+        int ret = pthread_getname_np(pthread_self(), current_name, 
sizeof(current_name));
+        if (ret == 0) {
+            _has_previous_name = true;
+            _previous_name = current_name;
+        }
+        Thread::set_self_name(new_name);
+    }
+
+    ~ScopedThreadName() {
+        if (_has_previous_name) {
+            Thread::set_self_name(_previous_name);
+        }
+    }
+
+private:
+    bool _has_previous_name = false;
+    std::string _previous_name;
+};
+
+} // namespace
 std::unique_ptr<faiss::IDSelector> FaissVectorIndex::roaring_to_faiss_selector(
         const roaring::Roaring& roaring) {
     std::vector<faiss::idx_t> ids;
@@ -154,7 +224,9 @@ public:
 void FaissVectorIndex::train(vectorized::Int64 n, const float* x) {
     DCHECK(x != nullptr);
     DCHECK(_index != nullptr);
-    omp_set_num_threads(config::omp_threads_limit);
+    ScopedThreadName scoped_name("faiss_train_idx");
+    // Reserve OpenMP threads globally so concurrent builds stay under 
omp_threads_limit.
+    ScopedOmpThreadBudget thread_budget;
     _index->train(n, x);
 }
 
@@ -169,11 +241,17 @@ void FaissVectorIndex::train(vectorized::Int64 n, const 
float* x) {
 doris::Status FaissVectorIndex::add(vectorized::Int64 n, const float* vec) {
     DCHECK(vec != nullptr);
     DCHECK(_index != nullptr);
-    omp_set_num_threads(config::omp_threads_limit);
-    DorisMetrics::instance()->ann_index_construction->increment(1);
-    _index->add(n, vec);
-    DorisMetrics::instance()->ann_index_construction->increment(-1);
-    DorisMetrics::instance()->ann_index_in_memory_rows_cnt->increment(n);
+    ScopedThreadName scoped_name("faiss_build_idx");
+    // build index for every 1M rows, so that we can adjust thread usage 
dynamically.
+    for (vectorized::Int64 i = 0; i < n; i += 1'000'000) {
+        // Apply the same thread budget when adding vectors to limit 
concurrency.
+        ScopedOmpThreadBudget thread_budget;
+        DorisMetrics::instance()->ann_index_construction->increment(1);
+        vectorized::Int64 chunk_size = std::min(1'000'000L, n - i);
+        _index->add(chunk_size, vec + i * _dimension);
+        
DorisMetrics::instance()->ann_index_in_memory_rows_cnt->increment(chunk_size);
+        DorisMetrics::instance()->ann_index_construction->increment(-1);
+    }
     return doris::Status::OK();
 }
 
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index c95a9ef02a0..0c3a713aab7 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -250,6 +250,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_search_cnt, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_in_memory_cnt, 
MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_in_memory_rows_cnt, 
MetricUnit::ROWS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_construction, 
MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(ann_index_build_index_threads, 
MetricUnit::NOUNIT);
 
 const std::string DorisMetrics::_s_registry_name = "doris_be";
 const std::string DorisMetrics::_s_hook_name = "doris_metrics";
@@ -418,6 +419,7 @@ DorisMetrics::DorisMetrics() : 
_metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
ann_index_in_memory_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
ann_index_in_memory_rows_cnt);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, ann_index_construction);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, 
ann_index_build_index_threads);
 }
 
 void DorisMetrics::initialize(bool init_system_metrics, const 
std::set<std::string>& disk_devices,
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 906e8b9a7fa..6e1c3e1a30e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -258,6 +258,7 @@ public:
     IntCounter* ann_index_in_memory_cnt = nullptr;
     IntCounter* ann_index_in_memory_rows_cnt = nullptr;
     IntCounter* ann_index_construction = nullptr;
+    IntCounter* ann_index_build_index_threads = nullptr;
 
     IntGauge* runtime_filter_consumer_num = nullptr;
     IntGauge* runtime_filter_consumer_ready_num = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to