This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 56cc9cc3048 [fix](cancel)) Fix pipeline task leak cancel (#39697)
56cc9cc3048 is described below
commit 56cc9cc304847a574116be6bc0c166fbe23f2f80
Author: zhiqiang <[email protected]>
AuthorDate: Thu Aug 22 00:40:22 2024 +0800
[fix](cancel)) Fix pipeline task leak cancel (#39697)
pick #39737
---
be/src/pipeline/pipeline_fragment_context.cpp | 7 -------
be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++++++
be/src/runtime/fragment_mgr.cpp | 5 +++--
3 files changed, 11 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index df7cd2ece28..4c677216e6a 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -186,13 +186,6 @@ void PipelineFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
// We need a more detail discussion.
_query_ctx->cancel(msg, Status::Cancelled(msg));
- if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
- if (msg.find("Pipeline task leak.") != std::string::npos) {
- LOG_WARNING("PipelineFragmentContext is cancelled due to illegal
state : {}",
- this->debug_string());
- }
- }
-
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 55a25718e00..d736879f0eb 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -147,6 +147,14 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout
: " << debug_string();
}
_query_ctx->cancel(msg, Status::Cancelled(msg), _fragment_id);
+
+ if (reason == PPlanFragmentCancelReason::INTERNAL_ERROR && !msg.empty()) {
+ if (msg.find("Pipeline task leak.") != std::string::npos) {
+ LOG_WARNING("PipelineFragmentContext is cancelled due to illegal
state : {}",
+ this->debug_string());
+ }
+ }
+
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
} else {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 74f42a28b8a..b68839a0d62 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -183,6 +183,7 @@ static Status _do_fetch_running_queries_rpc(const
FrontendInfo& fe_info,
query_set =
std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
rpc_result.running_queries.end());
+
return Status::OK();
};
@@ -1310,8 +1311,8 @@ void FragmentMgr::cancel_worker() {
itr != running_queries_on_all_fes.end()) {
// Query not found on this frontend, and the query
arrives before the last check
if (itr->second.find(q_ctx->query_id()) ==
itr->second.end() &&
- q_ctx->get_query_arrival_timestamp().tv_nsec <
- check_invalid_query_last_timestamp.tv_nsec
&&
+ q_ctx->get_query_arrival_timestamp().tv_sec <
+ check_invalid_query_last_timestamp.tv_sec
&&
q_ctx->get_query_source() ==
QuerySource::INTERNAL_FRONTEND) {
if (q_ctx->enable_pipeline_x_exec()) {
queries_pipeline_task_leak.push_back(q_ctx->query_id());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]