This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba7f5d892db [Improvement]Log be thread num (#37289)
ba7f5d892db is described below
commit ba7f5d892db4dcec76c396e836e23cce3eb02544
Author: wangbo <[email protected]>
AuthorDate: Mon Jul 8 19:39:27 2024 +0800
[Improvement]Log be thread num (#37289)
---
be/src/common/config.cpp | 3 +
be/src/common/config.h | 2 +
be/src/common/daemon.cpp | 14 ++++
be/src/common/daemon.h | 1 +
.../action/be_proc_thread_action.cpp} | 40 +++-------
.../action/be_proc_thread_action.h} | 39 +++-------
be/src/runtime/be_proc_monitor.cpp | 88 ++++++++++++++++++++++
.../{common/daemon.h => runtime/be_proc_monitor.h} | 40 ++--------
be/src/service/http_service.cpp | 6 ++
regression-test/pipeline/p0/conf/be.conf | 3 +
10 files changed, 144 insertions(+), 92 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index a5677721326..96ce3cd6fa1 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1165,6 +1165,9 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
+DEFINE_mBool(enable_be_proc_monitor, "false");
+DEFINE_mInt32(be_proc_monitor_interval_ms, "10000");
+
DEFINE_mBool(enable_workload_group_memory_gc, "true");
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0d424940cfc..16fc4a08d67 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1248,6 +1248,8 @@ DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_mString(doris_cgroup_cpu_path);
+DECLARE_mBool(enable_be_proc_monitor);
+DECLARE_mInt32(be_proc_monitor_interval_ms);
DECLARE_mBool(enable_workload_group_memory_gc);
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 757b1605688..c97904f5677 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -48,6 +48,7 @@
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
+#include "runtime/be_proc_monitor.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
@@ -399,6 +400,13 @@ void Daemon::wg_mem_used_refresh_thread() {
}
}
+void Daemon::be_proc_monitor_thread() {
+ while (!_stop_background_threads_latch.wait_for(
+ std::chrono::milliseconds(config::be_proc_monitor_interval_ms))) {
+ LOG(INFO) << "log be thread num, " <<
BeProcMonitor::get_be_thread_info();
+ }
+}
+
void Daemon::start() {
Status st;
st = Thread::create(
@@ -435,6 +443,12 @@ void Daemon::start() {
st = Thread::create(
"Daemon", "wg_mem_refresh_thread", [this]() {
this->wg_mem_used_refresh_thread(); },
&_threads.emplace_back());
+
+ if (config::enable_be_proc_monitor) {
+ st = Thread::create(
+ "Daemon", "be_proc_monitor_thread", [this]() {
this->be_proc_monitor_thread(); },
+ &_threads.emplace_back());
+ }
CHECK(st.ok()) << st;
}
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 0c282a8516a..9dfb079b904 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -45,6 +45,7 @@ private:
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();
void wg_mem_used_refresh_thread();
+ void be_proc_monitor_thread();
CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
diff --git a/be/src/common/daemon.h
b/be/src/http/action/be_proc_thread_action.cpp
similarity index 51%
copy from be/src/common/daemon.h
copy to be/src/http/action/be_proc_thread_action.cpp
index 0c282a8516a..a0762b3f316 100644
--- a/be/src/common/daemon.h
+++ b/be/src/http/action/be_proc_thread_action.cpp
@@ -15,38 +15,20 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
+#include "http/action/be_proc_thread_action.h"
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_status.h"
+#include "runtime/be_proc_monitor.h"
namespace doris {
-class Daemon {
-public:
- Daemon() : _stop_background_threads_latch(1) {}
- ~Daemon() = default;
-
- // Start background threads
- void start();
-
- // Stop background threads
- void stop();
+const static std::string HEADER_JSON = "application/json";
-private:
- void tcmalloc_gc_thread();
- void memory_maintenance_thread();
- void memory_gc_thread();
- void memtable_memory_refresh_thread();
- void calculate_metrics_thread();
- void je_purge_dirty_pages_thread() const;
- void report_runtime_query_statistics_thread();
- void wg_mem_used_refresh_thread();
+void BeProcThreadAction::handle(HttpRequest* req) {
+ req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain;
version=0.0.4");
+ HttpChannel::send_reply(req, HttpStatus::OK,
BeProcMonitor::get_be_thread_info());
+}
- CountDownLatch _stop_background_threads_latch;
- std::vector<scoped_refptr<Thread>> _threads;
-};
-} // namespace doris
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/common/daemon.h b/be/src/http/action/be_proc_thread_action.h
similarity index 53%
copy from be/src/common/daemon.h
copy to be/src/http/action/be_proc_thread_action.h
index 0c282a8516a..923bc3f56d9 100644
--- a/be/src/common/daemon.h
+++ b/be/src/http/action/be_proc_thread_action.h
@@ -14,39 +14,20 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
#pragma once
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
+#include "http/http_handler.h"
+#include "http/http_request.h"
namespace doris {
-class Daemon {
-public:
- Daemon() : _stop_background_threads_latch(1) {}
- ~Daemon() = default;
-
- // Start background threads
- void start();
+class HttpRequest;
- // Stop background threads
- void stop();
-
-private:
- void tcmalloc_gc_thread();
- void memory_maintenance_thread();
- void memory_gc_thread();
- void memtable_memory_refresh_thread();
- void calculate_metrics_thread();
- void je_purge_dirty_pages_thread() const;
- void report_runtime_query_statistics_thread();
- void wg_mem_used_refresh_thread();
-
- CountDownLatch _stop_background_threads_latch;
- std::vector<scoped_refptr<Thread>> _threads;
+class BeProcThreadAction : public HttpHandler {
+public:
+ BeProcThreadAction() = default;
+ ~BeProcThreadAction() override = default;
+ void handle(HttpRequest* req) override;
};
-} // namespace doris
+
+}; // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/be_proc_monitor.cpp
b/be/src/runtime/be_proc_monitor.cpp
new file mode 100644
index 00000000000..2a850fc8b04
--- /dev/null
+++ b/be/src/runtime/be_proc_monitor.cpp
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/be_proc_monitor.h"
+
+#include <fmt/format.h>
+#include <glog/logging.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include <deque>
+#include <filesystem>
+#include <fstream>
+#include <map>
+#include <nlohmann/json.hpp>
+
+std::string BeProcMonitor::get_be_thread_info() {
+ int32_t pid = getpid();
+ std::string proc_path = fmt::format("/proc/{}/task", pid);
+ if (access(proc_path.c_str(), F_OK) != 0) {
+ LOG(WARNING) << "be proc path " << proc_path << " not exists.";
+ return "";
+ }
+
+ std::map<std::string, int> thread_num_map;
+
+ int total_thread_count = 0;
+ int distinct_thread_name_count = 0;
+ for (const auto& entry : std::filesystem::directory_iterator(proc_path)) {
+ const std::string tid_path = entry.path().string();
+ std::string thread_name_path = tid_path + "/comm";
+ struct stat st;
+ // == 0 means exists
+ if (stat(thread_name_path.c_str(), &st) == 0) {
+ // NOTE: there is no need to close std::ifstream, it's called
during deconstruction.
+ //
refer:https://stackoverflow.com/questions/748014/do-i-need-to-manually-close-an-ifstream
+ std::ifstream file(thread_name_path.c_str());
+ if (!file.is_open()) {
+ continue;
+ }
+ std::stringstream str_buf;
+ str_buf << file.rdbuf();
+ std::string thread_name = str_buf.str();
+ thread_name.erase(std::remove(thread_name.begin(),
thread_name.end(), '\n'),
+ thread_name.end());
+
+ if (thread_num_map.find(thread_name) != thread_num_map.end()) {
+ thread_num_map[thread_name]++;
+ } else {
+ distinct_thread_name_count++;
+ thread_num_map.emplace(thread_name, 1);
+ }
+ total_thread_count++;
+ }
+ }
+
+ std::deque<std::pair<std::string, int>>
ordered_list(thread_num_map.begin(),
+ thread_num_map.end());
+ std::sort(ordered_list.begin(), ordered_list.end(),
+ [](const auto& lhs, const auto& rhs) { return lhs.second >
rhs.second; });
+
+ ordered_list.push_front(
+ std::make_pair("distinct_thread_name_count",
distinct_thread_name_count));
+ ordered_list.push_front(std::make_pair("total_thread_count",
total_thread_count));
+ ordered_list.push_front(std::make_pair("be_process_id", pid));
+
+ nlohmann::json js = nlohmann::json::array();
+ for (const auto& p : ordered_list) {
+ js.push_back({p.first, p.second});
+ }
+
+ std::string output_json_str = js.dump();
+ return output_json_str;
+}
\ No newline at end of file
diff --git a/be/src/common/daemon.h b/be/src/runtime/be_proc_monitor.h
similarity index 50%
copy from be/src/common/daemon.h
copy to be/src/runtime/be_proc_monitor.h
index 0c282a8516a..dfd962be012 100644
--- a/be/src/common/daemon.h
+++ b/be/src/runtime/be_proc_monitor.h
@@ -15,38 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-#pragma once
-
-#include <vector>
-
-#include "gutil/ref_counted.h"
-#include "util/countdown_latch.h"
-#include "util/thread.h"
-
-namespace doris {
-
-class Daemon {
+// Currently BeProcMonitor used to read proc/<pid>/task/ and log be's thread
num, we can find
+// which logic cost too much thread when BE core because of thread exhaustion.
+#include <string>
+class BeProcMonitor {
public:
- Daemon() : _stop_background_threads_latch(1) {}
- ~Daemon() = default;
-
- // Start background threads
- void start();
-
- // Stop background threads
- void stop();
-
-private:
- void tcmalloc_gc_thread();
- void memory_maintenance_thread();
- void memory_gc_thread();
- void memtable_memory_refresh_thread();
- void calculate_metrics_thread();
- void je_purge_dirty_pages_thread() const;
- void report_runtime_query_statistics_thread();
- void wg_mem_used_refresh_thread();
-
- CountDownLatch _stop_background_threads_latch;
- std::vector<scoped_refptr<Thread>> _threads;
-};
-} // namespace doris
+ static std::string get_be_thread_info();
+};
\ No newline at end of file
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 651323f7880..cb4d36f050a 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "http/action/adjust_log_level.h"
#include "http/action/adjust_tracing_dump.h"
+#include "http/action/be_proc_thread_action.h"
#include "http/action/calc_file_crc_action.h"
#include "http/action/check_rpc_channel_action.h"
#include "http/action/check_tablet_segment_action.h"
@@ -175,6 +176,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::GET,
"/api/query_pipeline_tasks/{query_id}",
query_pipeline_task_action);
+ // Dump all be process thread num
+ BeProcThreadAction* be_proc_thread_action = _pool.add(new
BeProcThreadAction());
+ _ev_http_server->register_handler(HttpMethod::GET,
"/api/be_process_thread_num",
+ be_proc_thread_action);
+
// Register BE LoadStream action
LoadStreamAction* load_stream_action = _pool.add(new LoadStreamAction());
_ev_http_server->register_handler(HttpMethod::GET, "/api/load_streams",
load_stream_action);
diff --git a/regression-test/pipeline/p0/conf/be.conf
b/regression-test/pipeline/p0/conf/be.conf
index d5e447e8601..3720da6f741 100644
--- a/regression-test/pipeline/p0/conf/be.conf
+++ b/regression-test/pipeline/p0/conf/be.conf
@@ -64,3 +64,6 @@ trino_connector_plugin_dir=/tmp/trino_connector/connectors
enable_jvm_monitor = true
+enable_be_proc_monitor = true
+be_proc_monitor_interval_ms = 30000
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]