This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fae2b4b4fe1 [pipeline](API) Add a new API to find pipeline tasks by a
specific query ID (#35563)
fae2b4b4fe1 is described below
commit fae2b4b4fe11923aea623f00e683ab147169ba5c
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 15:03:56 2024 +0800
[pipeline](API) Add a new API to find pipeline tasks by a specific query ID
(#35563)
---
be/src/http/action/pipeline_task_action.cpp | 30 +++++++++++++++++++++++++++++
be/src/http/action/pipeline_task_action.h | 9 +++++++++
be/src/runtime/fragment_mgr.cpp | 8 ++++++++
be/src/runtime/fragment_mgr.h | 1 +
be/src/runtime/query_context.cpp | 27 ++++++++++++++++++++++++++
be/src/runtime/query_context.h | 1 +
be/src/service/http_service.cpp | 5 +++++
7 files changed, 81 insertions(+)
diff --git a/be/src/http/action/pipeline_task_action.cpp
b/be/src/http/action/pipeline_task_action.cpp
index b19b42c9468..b6a7cabe514 100644
--- a/be/src/http/action/pipeline_task_action.cpp
+++ b/be/src/http/action/pipeline_task_action.cpp
@@ -56,4 +56,34 @@ void LongPipelineTaskAction::handle(HttpRequest* req) {
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
}
+void QueryPipelineTaskAction::handle(HttpRequest* req) {
+ req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain;
version=0.0.4");
+ int64_t high = 0;
+ int64_t low = 0;
+ try {
+ auto& query_id_str = req->param("query_id");
+ if (query_id_str.length() != 16 * 2 + 1) {
+ HttpChannel::send_reply(
+ req, HttpStatus::INTERNAL_SERVER_ERROR,
+ "Invalid query id! Query id should be {hi}-{lo} which is a
hexadecimal. \n");
+ return;
+ }
+ from_hex(&high, query_id_str.substr(0, 16));
+ from_hex(&low, query_id_str.substr(17));
+ } catch (const std::exception& e) {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "invalid argument.query_id: {},
meet error: {}. \n",
+ req->param("query_id"), e.what());
+ LOG(WARNING) << fmt::to_string(debug_string_buffer);
+ HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+ fmt::to_string(debug_string_buffer));
+ return;
+ }
+ TUniqueId query_id;
+ query_id.hi = high;
+ query_id.lo = low;
+ HttpChannel::send_reply(req, HttpStatus::OK,
+
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id));
+}
+
} // end namespace doris
diff --git a/be/src/http/action/pipeline_task_action.h
b/be/src/http/action/pipeline_task_action.h
index 553ac856e6f..23c1a17464f 100644
--- a/be/src/http/action/pipeline_task_action.h
+++ b/be/src/http/action/pipeline_task_action.h
@@ -41,4 +41,13 @@ public:
void handle(HttpRequest* req) override;
};
+class QueryPipelineTaskAction : public HttpHandler {
+public:
+ QueryPipelineTaskAction() = default;
+
+ ~QueryPipelineTaskAction() override = default;
+
+ void handle(HttpRequest* req) override;
+};
+
} // end namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 375cfd5eb1e..ad8bbefa26e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -848,6 +848,14 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t
duration) {
return fmt::to_string(debug_string_buffer);
}
+std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
+ if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+ return q_ctx->print_all_pipeline_context();
+ } else {
+ return fmt::format("Query context (query id = {}) not found. \n",
print_id(query_id));
+ }
+}
+
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
const FinishCallback& cb) {
VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment
params is "
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c8298ee67b7..b75c6c35ad8 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -146,6 +146,7 @@ public:
}
std::string dump_pipeline_tasks(int64_t duration = 0);
+ std::string dump_pipeline_tasks(TUniqueId& query_id);
void get_runtime_query_info(std::vector<WorkloadQueryInfo>*
_query_info_list);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 490d0886533..dcc74c40e1c 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -247,6 +247,33 @@ void QueryContext::cancel_all_pipeline_context(const
Status& reason) {
}
}
+std::string QueryContext::print_all_pipeline_context() {
+ std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
+ fmt::memory_buffer debug_string_buffer;
+ size_t i = 0;
+ {
+ fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in
query {}. \n",
+ _fragment_id_to_pipeline_ctx.size(),
print_id(_query_id));
+
+ {
+ std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+ for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+ ctx_to_print.push_back(f_context);
+ }
+ }
+ for (auto& f_context : ctx_to_print) {
+ if (auto pipeline_ctx = f_context.lock()) {
+ auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
+ fmt::format_to(debug_string_buffer,
+ "No.{} (elapse_second={}s, fragment_id={}) :
{}\n", i, elapsed,
+ pipeline_ctx->get_fragment_id(),
pipeline_ctx->debug_string());
+ i++;
+ }
+ }
+ }
+ return fmt::to_string(debug_string_buffer);
+}
+
Status QueryContext::cancel_pipeline_context(const int fragment_id, const
Status& reason) {
std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
{
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ff5d1a549ea..318afd69187 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -100,6 +100,7 @@ public:
[[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
void cancel_all_pipeline_context(const Status& reason);
+ std::string print_all_pipeline_context();
Status cancel_pipeline_context(const int fragment_id, const Status&
reason);
void set_pipeline_context(const int fragment_id,
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 8788877ec8d..ee5b4174387 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -161,6 +161,11 @@ Status HttpService::start() {
_ev_http_server->register_handler(HttpMethod::GET,
"/api/running_pipeline_tasks/{duration}",
long_pipeline_task_action);
+ // Dump all running pipeline tasks which has been running for more than
{duration} seconds
+ QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new
QueryPipelineTaskAction());
+ _ev_http_server->register_handler(HttpMethod::GET,
"/api/query_pipeline_tasks/{query_id}",
+ query_pipeline_task_action);
+
// Register Tablets Info action
TabletsInfoAction* tablets_info_action =
_pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]