This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fae2b4b4fe1 [pipeline](API) Add a new API to find pipeline tasks by a 
specific query ID (#35563)
fae2b4b4fe1 is described below

commit fae2b4b4fe11923aea623f00e683ab147169ba5c
Author: Gabriel <[email protected]>
AuthorDate: Wed May 29 15:03:56 2024 +0800

    [pipeline](API) Add a new API to find pipeline tasks by a specific query ID 
(#35563)
---
 be/src/http/action/pipeline_task_action.cpp | 30 +++++++++++++++++++++++++++++
 be/src/http/action/pipeline_task_action.h   |  9 +++++++++
 be/src/runtime/fragment_mgr.cpp             |  8 ++++++++
 be/src/runtime/fragment_mgr.h               |  1 +
 be/src/runtime/query_context.cpp            | 27 ++++++++++++++++++++++++++
 be/src/runtime/query_context.h              |  1 +
 be/src/service/http_service.cpp             |  5 +++++
 7 files changed, 81 insertions(+)

diff --git a/be/src/http/action/pipeline_task_action.cpp 
b/be/src/http/action/pipeline_task_action.cpp
index b19b42c9468..b6a7cabe514 100644
--- a/be/src/http/action/pipeline_task_action.cpp
+++ b/be/src/http/action/pipeline_task_action.cpp
@@ -56,4 +56,34 @@ void LongPipelineTaskAction::handle(HttpRequest* req) {
                             
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(duration));
 }
 
+void QueryPipelineTaskAction::handle(HttpRequest* req) {
+    req->add_output_header(HttpHeaders::CONTENT_TYPE, "text/plain; 
version=0.0.4");
+    int64_t high = 0;
+    int64_t low = 0;
+    try {
+        auto& query_id_str = req->param("query_id");
+        if (query_id_str.length() != 16 * 2 + 1) {
+            HttpChannel::send_reply(
+                    req, HttpStatus::INTERNAL_SERVER_ERROR,
+                    "Invalid query id! Query id should be {hi}-{lo} which is a 
hexadecimal. \n");
+            return;
+        }
+        from_hex(&high, query_id_str.substr(0, 16));
+        from_hex(&low, query_id_str.substr(17));
+    } catch (const std::exception& e) {
+        fmt::memory_buffer debug_string_buffer;
+        fmt::format_to(debug_string_buffer, "invalid argument.query_id: {}, 
meet error: {}. \n",
+                       req->param("query_id"), e.what());
+        LOG(WARNING) << fmt::to_string(debug_string_buffer);
+        HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
+                                fmt::to_string(debug_string_buffer));
+        return;
+    }
+    TUniqueId query_id;
+    query_id.hi = high;
+    query_id.lo = low;
+    HttpChannel::send_reply(req, HttpStatus::OK,
+                            
ExecEnv::GetInstance()->fragment_mgr()->dump_pipeline_tasks(query_id));
+}
+
 } // end namespace doris
diff --git a/be/src/http/action/pipeline_task_action.h 
b/be/src/http/action/pipeline_task_action.h
index 553ac856e6f..23c1a17464f 100644
--- a/be/src/http/action/pipeline_task_action.h
+++ b/be/src/http/action/pipeline_task_action.h
@@ -41,4 +41,13 @@ public:
     void handle(HttpRequest* req) override;
 };
 
+class QueryPipelineTaskAction : public HttpHandler {
+public:
+    QueryPipelineTaskAction() = default;
+
+    ~QueryPipelineTaskAction() override = default;
+
+    void handle(HttpRequest* req) override;
+};
+
 } // end namespace doris
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 375cfd5eb1e..ad8bbefa26e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -848,6 +848,14 @@ std::string FragmentMgr::dump_pipeline_tasks(int64_t 
duration) {
     return fmt::to_string(debug_string_buffer);
 }
 
+std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
+    if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
+        return q_ctx->print_all_pipeline_context();
+    } else {
+        return fmt::format("Query context (query id = {}) not found. \n", 
print_id(query_id));
+    }
+}
+
 Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
                                        const FinishCallback& cb) {
     VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment 
params is "
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c8298ee67b7..b75c6c35ad8 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -146,6 +146,7 @@ public:
     }
 
     std::string dump_pipeline_tasks(int64_t duration = 0);
+    std::string dump_pipeline_tasks(TUniqueId& query_id);
 
     void get_runtime_query_info(std::vector<WorkloadQueryInfo>* 
_query_info_list);
 
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 490d0886533..dcc74c40e1c 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -247,6 +247,33 @@ void QueryContext::cancel_all_pipeline_context(const 
Status& reason) {
     }
 }
 
+std::string QueryContext::print_all_pipeline_context() {
+    std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
+    fmt::memory_buffer debug_string_buffer;
+    size_t i = 0;
+    {
+        fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in 
query {}. \n",
+                       _fragment_id_to_pipeline_ctx.size(), 
print_id(_query_id));
+
+        {
+            std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
+            for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
+                ctx_to_print.push_back(f_context);
+            }
+        }
+        for (auto& f_context : ctx_to_print) {
+            if (auto pipeline_ctx = f_context.lock()) {
+                auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
+                fmt::format_to(debug_string_buffer,
+                               "No.{} (elapse_second={}s, fragment_id={}) : 
{}\n", i, elapsed,
+                               pipeline_ctx->get_fragment_id(), 
pipeline_ctx->debug_string());
+                i++;
+            }
+        }
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status QueryContext::cancel_pipeline_context(const int fragment_id, const 
Status& reason) {
     std::weak_ptr<pipeline::PipelineFragmentContext> ctx_to_cancel;
     {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ff5d1a549ea..318afd69187 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -100,6 +100,7 @@ public:
     [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
 
     void cancel_all_pipeline_context(const Status& reason);
+    std::string print_all_pipeline_context();
     Status cancel_pipeline_context(const int fragment_id, const Status& 
reason);
     void set_pipeline_context(const int fragment_id,
                               
std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 8788877ec8d..ee5b4174387 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -161,6 +161,11 @@ Status HttpService::start() {
     _ev_http_server->register_handler(HttpMethod::GET, 
"/api/running_pipeline_tasks/{duration}",
                                       long_pipeline_task_action);
 
+    // Dump all running pipeline tasks which has been running for more than 
{duration} seconds
+    QueryPipelineTaskAction* query_pipeline_task_action = _pool.add(new 
QueryPipelineTaskAction());
+    _ev_http_server->register_handler(HttpMethod::GET, 
"/api/query_pipeline_tasks/{query_id}",
+                                      query_pipeline_task_action);
+
     // Register Tablets Info action
     TabletsInfoAction* tablets_info_action =
             _pool.add(new TabletsInfoAction(_env, TPrivilegeHier::GLOBAL, 
TPrivilegeType::ADMIN));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to