This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 150d8bb60fc [branch-2.0](pick 27738) Warning log to trace send
fragment #27738 (#27760)
150d8bb60fc is described below
commit 150d8bb60fcc06323f2a416764d162ebb72d0fb5
Author: zhiqiang <[email protected]>
AuthorDate: Thu Nov 30 08:40:09 2023 +0800
[branch-2.0](pick 27738) Warning log to trace send fragment #27738 (#27760)
---
be/src/service/internal_service.cpp | 28 ++++++++++++++++++++--
.../main/java/org/apache/doris/qe/Coordinator.java | 27 ++++++++++++++++-----
2 files changed, 47 insertions(+), 8 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 4ac98dbf25f..df88be44ba7 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -458,10 +458,22 @@ Status
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
uint32_t len = ser_request.size();
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact,
&t_request));
}
+ const auto& fragment_list = t_request.paramsList;
+ MonotonicStopWatch timer;
+ timer.start();
for (const TExecPlanFragmentParams& params : t_request.paramsList) {
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
}
+
+ timer.stop();
+ double cost_secs = static_cast<double>(timer.elapsed_time()) /
1000000000ULL;
+ if (cost_secs > 5) {
+ LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it
costs too much",
+ fragment_list.size(),
print_id(fragment_list.front().params.query_id),
+ cost_secs);
+ }
+
return Status::OK();
} else if (version == PFragmentRequestVersion::VERSION_3) {
TPipelineFragmentParamsList t_request;
@@ -471,9 +483,21 @@ Status
PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& ser_req
RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact,
&t_request));
}
- for (const TPipelineFragmentParams& params : t_request.params_list) {
-
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(params));
+ const auto& fragment_list = t_request.params_list;
+ MonotonicStopWatch timer;
+ timer.start();
+
+ for (const TPipelineFragmentParams& fragment : fragment_list) {
+
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(fragment));
+ }
+
+ timer.stop();
+ double cost_secs = static_cast<double>(timer.elapsed_time()) /
1000000000ULL;
+ if (cost_secs > 5) {
+ LOG_WARNING("Prepare {} fragments of query {} costs {} seconds, it
costs too much",
+ fragment_list.size(),
print_id(fragment_list.front().query_id), cost_secs);
}
+
return Status::OK();
} else {
return Status::InternalError("invalid version");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index af8eb518e9d..cf3175bc72d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -875,8 +875,13 @@ public class Coordinator implements CoordInterface {
long leftTimeMs,
String operation) throws RpcException, UserException {
if (leftTimeMs <= 0) {
- throw new UserException("timeout before waiting for " + operation
+ " RPC. Elapse(sec): " + (
- (System.currentTimeMillis() - timeoutDeadline) / 1000 +
queryOptions.getExecutionTimeout()));
+ long elapsed = (System.currentTimeMillis() - timeoutDeadline) /
1000 + queryOptions.getExecutionTimeout();
+ String msg = String.format(
+ "timeout before waiting {} rpc, query timeout: {}, already
elapsed:{}, left for this:{}",
+ operation, queryOptions.getExecutionTimeout(),
elapsed, leftTimeMs);
+
+ LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
+ throw new UserException(msg);
}
long timeoutMs = Math.min(leftTimeMs,
Config.remote_fragment_exec_timeout_ms);
@@ -904,7 +909,10 @@ public class Coordinator implements CoordInterface {
code = TStatusCode.INTERNAL_ERROR;
} catch (TimeoutException e) {
exception = e;
- errMsg = "timeout when waiting for " + operation + " RPC.
Wait(sec): " + timeoutMs / 1000;
+ errMsg = String.format(
+ "timeout when waiting for {} rpc, query timeout {}, left
timeout for this operation: {}",
+ operation, queryOptions.getExecutionTimeout(), timeoutMs /
10000);
+ LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
code = TStatusCode.TIMEOUT;
}
@@ -942,8 +950,12 @@ public class Coordinator implements CoordInterface {
Future<PExecPlanFragmentResult>>> futures, long leftTimeMs,
String operation) throws RpcException, UserException {
if (leftTimeMs <= 0) {
- throw new UserException("timeout before waiting for " + operation
+ " RPC. Elapse(sec): " + (
- (System.currentTimeMillis() - timeoutDeadline) / 1000 +
queryOptions.query_timeout));
+ long elapsed = (System.currentTimeMillis() - timeoutDeadline) /
1000 + queryOptions.getExecutionTimeout();
+ String msg = String.format(
+ "timeout before waiting {} rpc, query timeout: {}, already
elapsed:{}, left for this:{}",
+ operation, queryOptions.getExecutionTimeout(), elapsed,
leftTimeMs);
+ LOG.warn("Query {} {}", DebugUtil.printId(queryId), msg);
+ throw new UserException(msg);
}
long timeoutMs = Math.min(leftTimeMs,
Config.remote_fragment_exec_timeout_ms);
@@ -971,7 +983,10 @@ public class Coordinator implements CoordInterface {
code = TStatusCode.INTERNAL_ERROR;
} catch (TimeoutException e) {
exception = e;
- errMsg = "timeout when waiting for " + operation + " RPC.
Wait(sec): " + timeoutMs / 1000;
+ errMsg = String.format(
+ "timeout when waiting for {} rpc, query timeout {}, left
timeout for this operation: {}",
+ operation,
queryOptions.getExecutionTimeout(), timeoutMs / 10000);
+ LOG.warn("Query {} {}", DebugUtil.printId(queryId), errMsg);
code = TStatusCode.TIMEOUT;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]