This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new abcba778ff5 [fix](cancel) Fix cancel msg on branch-2.1 (#41798)
abcba778ff5 is described below
commit abcba778ff566355bd92b8f1914a058a2ae634e1
Author: zhiqiang <[email protected]>
AuthorDate: Tue Oct 15 17:15:05 2024 +0800
[fix](cancel) Fix cancel msg on branch-2.1 (#41798)
Make sure we can tell cancel reason from:
1. user cancel
2. timeout
3. others
```text
mysql [demo]>set query_timeout=1;
--------------
set query_timeout=1
--------------
Query OK, 0 rows affected (0.00 sec)
mysql [demo]>select sleep(5);
--------------
select sleep(5)
--------------
ERROR 1105 (HY000): errCode = 2, detailMessage = Timeout
mysql [demo]>select sleep(5);
--------------
select sleep(5)
--------------
^C^C -- sending "KILL QUERY 0" to server ...
^C -- query aborted
ERROR 1105 (HY000): errCode = 2, detailMessage = cancel query by user from
127.0.0.1:64208
```
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 2 +-
be/src/runtime/buffer_control_block.cpp | 10 ++++----
be/src/runtime/buffer_control_block.h | 6 ++---
be/src/runtime/fragment_mgr.cpp | 12 +++++-----
be/src/runtime/result_buffer_mgr.cpp | 7 +++---
be/src/runtime/result_buffer_mgr.h | 2 +-
.../httpv2/rest/manager/QueryProfileAction.java | 2 +-
.../doris/job/extensions/insert/InsertTask.java | 2 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 2 +-
.../org/apache/doris/load/ExportTaskExecutor.java | 2 +-
.../java/org/apache/doris/load/loadv2/LoadJob.java | 2 +-
.../commands/insert/AbstractInsertExecutor.java | 2 +-
.../java/org/apache/doris/qe/ConnectContext.java | 8 +++----
.../java/org/apache/doris/qe/ConnectScheduler.java | 4 ++--
.../main/java/org/apache/doris/qe/Coordinator.java | 28 ++++++++++------------
.../java/org/apache/doris/qe/ResultReceiver.java | 11 +++++----
.../java/org/apache/doris/qe/StmtExecutor.java | 4 ++--
.../WorkloadActionCancelQuery.java | 2 +-
.../sessions/FlightSqlConnectContext.java | 2 +-
.../apache/doris/statistics/BaseAnalysisTask.java | 2 +-
20 files changed, 55 insertions(+), 57 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index a3dff107f1b..53dae142a6d 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -145,7 +145,7 @@ void PipelineXFragmentContext::cancel(const
PPlanFragmentCancelReason& reason,
LOG_INFO("PipelineXFragmentContext::cancel")
.tag("query_id", print_id(_query_id))
.tag("fragment_id", _fragment_id)
- .tag("reason", reason)
+ .tag("reason", PPlanFragmentCancelReason_Name(reason))
.tag("error message", msg);
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout
: " << debug_string();
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index 4b9fa57ce64..c61c98a324b 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id,
int buffer_size)
}
BufferControlBlock::~BufferControlBlock() {
- cancel();
+ cancel(Status::Cancelled("Cancelled"));
}
Status BufferControlBlock::init() {
@@ -266,13 +266,13 @@ Status BufferControlBlock::close(Status exec_status) {
return Status::OK();
}
-void BufferControlBlock::cancel() {
+void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
_is_cancelled = true;
_data_removal.notify_all();
_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
- ctx->on_failure(Status::Cancelled("Cancelled"));
+ ctx->on_failure(reason);
}
_waiting_rpc.clear();
}
@@ -301,8 +301,8 @@ Status
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch
return Status::OK();
}
-void PipBufferControlBlock::cancel() {
- BufferControlBlock::cancel();
+void PipBufferControlBlock::cancel(const Status& reason) {
+ BufferControlBlock::cancel(reason);
_update_dependency();
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index b8b3f3d163e..9e991613f2e 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -86,8 +86,8 @@ public:
// close buffer block, set _status to exec_status and set _is_close to
true;
// called because data has been read or error happened.
Status close(Status exec_status);
- // this is called by RPC, called from coordinator
- virtual void cancel();
+
+ virtual void cancel(const Status& reason);
[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
@@ -152,7 +152,7 @@ public:
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result)
override;
- void cancel() override;
+ void cancel(const Status& reason) override;
void set_dependency(std::shared_ptr<pipeline::Dependency>
result_sink_dependency);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index acf622b4196..4696814e55d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() {
clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
do {
- std::vector<TUniqueId> to_cancel;
+ std::vector<TUniqueId> queries_timeout;
std::vector<TUniqueId> queries_to_cancel;
std::vector<TUniqueId> queries_pipeline_task_leak;
// Fe process uuid -> set<QueryId>
@@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() {
std::lock_guard<std::mutex> lock(_lock);
for (auto& fragment_instance_itr : _fragment_instance_map) {
if (fragment_instance_itr.second->is_timeout(now)) {
-
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
+
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
for (auto& pipeline_itr : _pipeline_map) {
@@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() {
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
->instance_ids(ins_ids);
for (auto& ins_id : ins_ids) {
- to_cancel.push_back(ins_id);
+ queries_timeout.push_back(ins_id);
}
} else {
pipeline_itr.second->clear_finished_tasks();
@@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() {
// TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
// designed to count canceled fragment of non-pipeline query.
- timeout_canceled_fragment_count->increment(to_cancel.size());
- for (auto& id : to_cancel) {
- cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
+ timeout_canceled_fragment_count->increment(queries_timeout.size());
+ for (auto& id : queries_timeout) {
+ cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query
timeout");
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout
instance "
<< print_id(id);
}
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index 3d96c1871b9..f81c9b1094f 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -32,6 +32,7 @@
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
+#include "common/status.h"
#include "runtime/buffer_control_block.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
@@ -150,13 +151,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId&
finst_id,
return Status::OK();
}
-void ResultBufferMgr::cancel(const TUniqueId& query_id) {
+void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
BufferMap::iterator iter = _buffer_map.find(query_id);
if (_buffer_map.end() != iter) {
- iter->second->cancel();
+ iter->second->cancel(reason);
_buffer_map.erase(iter);
}
}
@@ -206,7 +207,7 @@ void ResultBufferMgr::cancel_thread() {
// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
- cancel(query_to_cancel[i]);
+ cancel(query_to_cancel[i], Status::TimedOut("Query tiemout"));
}
} while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index e6ae0cc1042..18846684233 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -71,7 +71,7 @@ public:
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId&
query_id);
// cancel
- void cancel(const TUniqueId& fragment_id);
+ void cancel(const TUniqueId& query_id, const Status& reason);
// cancel one query at a future time.
void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 0623932bc9d..d6bb25e9533 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -576,7 +576,7 @@ public class QueryProfileAction extends RestBaseController {
}
ExecuteEnv env = ExecuteEnv.getInstance();
- env.getScheduler().cancelQuery(queryId);
+ env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
return ResponseEntityBuilder.ok();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index ee5abed8392..b4f52808f4b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -221,7 +221,7 @@ public class InsertTask extends AbstractTask {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
- stmtExecutor.cancel();
+ stmtExecutor.cancel("insert task cancelled");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 59a421509d9..966291bd7aa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -258,7 +258,7 @@ public class MTMVTask extends AbstractTask {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
- executor.cancel();
+ executor.cancel("mtmv task cancelled");
}
after();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
index 1424f3bc301..0e434b0b820 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java
@@ -162,7 +162,7 @@ public class ExportTaskExecutor implements
TransientTaskExecutor {
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
- stmtExecutor.cancel();
+ stmtExecutor.cancel("export task cancelled");
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 737eb33b584..f02c0b289b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -600,7 +600,7 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
for (TUniqueId loadId : loadIds) {
Coordinator coordinator =
QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
- coordinator.cancel();
+ coordinator.cancel(failMsg.getMsg());
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 064b9abfcac..58a45031ffa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -143,7 +143,7 @@ public abstract class AbstractInsertExecutor {
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
- coordinator.cancel();
+ coordinator.cancel("insert timeout");
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy
backend. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index ba53c14e1dc..c5622d54a14 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -935,7 +935,7 @@ public class ConnectContext {
closeChannel();
}
// Now, cancel running query.
- cancelQuery();
+ cancelQuery("cancel query by user from " + getRemoteHostPortString());
}
// kill operation with no protect by timeout.
@@ -956,10 +956,10 @@ public class ConnectContext {
}
}
- public void cancelQuery() {
+ public void cancelQuery(String cancelMessage) {
StmtExecutor executorRef = executor;
if (executorRef != null) {
- executorRef.cancel();
+ executorRef.cancel(cancelMessage);
}
}
@@ -990,7 +990,7 @@ public class ConnectContext {
long timeout = getExecTimeout() * 1000L;
if (delta > timeout) {
LOG.warn("kill {} timeout, remote: {}, query timeout: {},
query id: {}",
- timeoutTag, getRemoteHostPortString(), timeout,
queryId);
+ timeoutTag, getRemoteHostPortString(), timeout,
DebugUtil.printId(queryId));
killFlag = true;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 01b41ec3e96..5da6bb1ba95 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -145,11 +145,11 @@ public class ConnectScheduler {
return null;
}
- public void cancelQuery(String queryId) {
+ public void cancelQuery(String queryId, String cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
- ctx.cancelQuery();
+ ctx.cancelQuery(cancelReason);
break;
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b0d1cbaba05..7d9c8243c04 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1164,7 +1164,7 @@ public class Coordinator implements CoordInterface {
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
- cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
@@ -1259,7 +1259,7 @@ public class Coordinator implements CoordInterface {
errMsg = operation + " failed. " + exception.getMessage();
}
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
- cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
errMsg);
switch (code) {
case TIMEOUT:
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
@@ -1385,9 +1385,9 @@ public class Coordinator implements CoordInterface {
queryStatus.updateStatus(status.getErrorCode(),
status.getErrorMsg());
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
- cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
+ cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT,
status.getErrorMsg());
} else {
- cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
status.getErrorMsg());
}
} finally {
lock.unlock();
@@ -1426,7 +1426,7 @@ public class Coordinator implements CoordInterface {
throw new RpcException(null, copyStatus.getErrorMsg());
} else {
String errMsg = copyStatus.getErrorMsg();
- LOG.warn("query failed: {}", errMsg);
+ LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId),
errMsg);
throw new UserException(errMsg);
}
}
@@ -1441,7 +1441,7 @@ public class Coordinator implements CoordInterface {
if (LOG.isDebugEnabled()) {
LOG.debug("no block query, return num >= limit rows, need
cancel");
}
- cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
+ cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH,
"query reach limit");
}
if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().dryRunQuery) {
numReceivedRows = 0;
@@ -1528,8 +1528,8 @@ public class Coordinator implements CoordInterface {
// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
- public void cancel() {
- cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
+ public void cancel(String errorMsg) {
+ cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg);
if (queueToken != null) {
queueToken.cancel();
}
@@ -1552,8 +1552,8 @@ public class Coordinator implements CoordInterface {
queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
}
LOG.warn("Cancel execution of query {}, this is a outside invoke,
cancelReason {}",
- DebugUtil.printId(queryId), cancelReason.toString());
- cancelInternal(cancelReason);
+ DebugUtil.printId(queryId), errorMsg);
+ cancelInternal(cancelReason, errorMsg);
} finally {
unlock();
}
@@ -1577,9 +1577,9 @@ public class Coordinator implements CoordInterface {
}
}
- private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
+ private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason,
String cancelMessage) {
if (null != receiver) {
- receiver.cancel(cancelReason);
+ receiver.cancel(cancelReason, cancelMessage);
}
if (null != pointExec) {
pointExec.cancel();
@@ -3307,10 +3307,6 @@ public class Coordinator implements CoordInterface {
DebugUtil.printId(fragmentInstanceId()), status.toString());
}
}
- LOG.warn("Failed to cancel query {} instance
initiated={} done={} backend: {},"
- + "fragment instance id={}, reason: {}",
- DebugUtil.printId(queryId), initiated,
done, backend.getId(),
- DebugUtil.printId(fragmentInstanceId()),
"without status");
}
public void onFailure(Throwable t) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
index 981d720e8b9..43ad573bf79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiver.java
@@ -111,8 +111,8 @@ public class ResultReceiver {
LOG.warn("Query {} get result timeout, get result
duration {} ms",
DebugUtil.printId(this.queryId), (timeoutTs -
currentTs) / 1000);
setRunStatus(Status.TIMEOUT);
- status.updateStatus(TStatusCode.TIMEOUT, "");
- updateCancelReason("fetch data timeout");
+ status.updateStatus(TStatusCode.TIMEOUT, "Query
timeout");
+ updateCancelReason("Query timeout");
return null;
} catch (InterruptedException e) {
// continue to get result
@@ -183,7 +183,7 @@ public class ResultReceiver {
}
} catch (TimeoutException e) {
LOG.warn("fetch result timeout, finstId={}",
DebugUtil.printId(finstId), e);
- status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
+ status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
} finally {
synchronized (this) {
currentThread = null;
@@ -205,13 +205,14 @@ public class ResultReceiver {
}
}
- public void cancel(Types.PPlanFragmentCancelReason reason) {
+ public void cancel(Types.PPlanFragmentCancelReason reason, String
cancelMessage) {
if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
setRunStatus(Status.TIMEOUT);
} else {
setRunStatus(Status.CANCELLED);
}
- updateCancelReason(reason.toString());
+
+ updateCancelReason(cancelMessage);
synchronized (this) {
if (currentThread != null) {
// TODO(cmy): we cannot interrupt this thread, or we may throw
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 3efed4b7650..84aed148106 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1475,7 +1475,7 @@ public class StmtExecutor {
}
// Because this is called by other thread
- public void cancel() {
+ public void cancel(String message) {
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand =
getInsertOverwriteTableCommand();
if (insertOverwriteTableCommand.isPresent()) {
// If the be scheduling has not been triggered yet, cancel the
scheduling first
@@ -1483,7 +1483,7 @@ public class StmtExecutor {
}
Coordinator coordRef = coord;
if (coordRef != null) {
- coordRef.cancel();
+ coordRef.cancel(message);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
index 2dcff6075f4..d512bcfb489 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadActionCancelQuery.java
@@ -32,7 +32,7 @@ public class WorkloadActionCancelQuery implements
WorkloadAction {
&& queryInfo.tUniqueId != null
&&
QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("cancel query {} triggered by query schedule policy.",
queryInfo.queryId);
- queryInfo.context.cancelQuery();
+ queryInfo.context.cancelQuery("cancel query by workload policy");
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 9f703dff92b..406497c77db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -74,7 +74,7 @@ public class FlightSqlConnectContext extends ConnectContext {
connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
- cancelQuery();
+ cancelQuery("arrow flight query killed by user");
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
index 31fae23284b..f2919afbb5a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java
@@ -192,7 +192,7 @@ public abstract class BaseAnalysisTask {
public void cancel() {
killed = true;
if (stmtExecutor != null) {
- stmtExecutor.cancel();
+ stmtExecutor.cancel("analysis task cancelled");
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]