This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3e1e8db173 [fix](exec) fix thread token shutdown (#14418)
3e1e8db173 is described below
commit 3e1e8db173bc1eb36a4ffa84d2d011ff8e1f8721
Author: zhannngchen <[email protected]>
AuthorDate: Sun Nov 20 00:04:48 2022 +0800
[fix](exec) fix thread token shutdown (#14418)
Fix Thread pool token was shut down error.
This is because when there are more than 1 fragment of a query on one BE,
the thread token maybe
reset incorrectly, causing thread token shutdown earlier.
cherry-pick from master
Introduced from #13021
---
be/src/runtime/fragment_mgr.cpp | 11 ++++++-----
be/src/runtime/plan_fragment_executor.cpp | 4 +++-
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java | 5 +++--
3 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 57d2a05f1e..29cdd17320 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -255,8 +255,9 @@ Status FragmentExecState::execute() {
CgroupsMgr::apply_system_cgroup();
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start
executing Fragment");
Status status = _executor.open();
- WARN_IF_ERROR(status, strings::Substitute("Got error while opening
fragment $0",
-
print_id(_fragment_instance_id)));
+ WARN_IF_ERROR(status,
+ strings::Substitute("Got error while opening fragment
$0, query id: $1",
+ print_id(_fragment_instance_id),
print_id(_query_id)));
_executor.close();
if (!status.ok()) {
@@ -403,7 +404,8 @@ void FragmentExecState::coordinator_callback(const Status&
status, RuntimeProfil
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.to_string()
- << " to coordinator: " << _coord_addr;
+ << " to coordinator: " << _coord_addr << ", query id: "
<< print_id(_query_id)
+ << ", instance id: " << print_id(_fragment_instance_id);
}
try {
try {
@@ -630,7 +632,6 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
- _set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@@ -689,7 +690,7 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
<< print_id(fragments_ctx->query_id)
<< " limit: " << PrettyPrinter::print(bytes_limit,
TUnit::BYTES);
} else {
- // Already has a query fragmentscontext, use it
+ // Already has a query fragments context, use it
fragments_ctx = search->second;
}
}
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 8209c82e83..3a9206f1fa 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -626,7 +626,9 @@ void PlanFragmentExecutor::update_status(const Status&
new_status) {
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
LOG_INFO("PlanFragmentExecutor::cancel")
.tag("query_id", _query_id)
- .tag("instance_id", _runtime_state->fragment_instance_id());
+ .tag("instance_id", _runtime_state->fragment_instance_id())
+ .tag("reason", reason)
+ .tag("error message", msg);
DCHECK(_prepared);
_cancel_reason = reason;
_cancel_msg = msg;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 035ed03cfe..a89aa3ef42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1695,8 +1695,9 @@ public class Coordinator {
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been
initiated)
if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
- LOG.warn("one instance report fail, query_id={} instance_id={}",
- DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()));
+ LOG.warn("one instance report fail, query_id={} instance_id={},
error message: {}",
+ DebugUtil.printId(queryId),
DebugUtil.printId(params.getFragmentInstanceId()),
+ status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
}
if (execState.done) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]