This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5dfc5d2c77c [enhancement](querycancel) print detail message when query
is cancelled (#38859)
5dfc5d2c77c is described below
commit 5dfc5d2c77ce0d5feaf1a67abd2669407ed19ac8
Author: yiguolei <[email protected]>
AuthorDate: Mon Aug 5 14:47:03 2024 +0800
[enhancement](querycancel) print detail message when query is cancelled
(#38859)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
.../main/java/org/apache/doris/common/Status.java | 7 ++++
.../java/org/apache/doris/qe/CoordInterface.java | 2 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 43 +++++++++++++---------
.../java/org/apache/doris/qe/PointQueryExec.java | 2 +-
.../org/apache/doris/qe/PointQueryExecutor.java | 2 +-
.../org/apache/doris/qe/QueryCancelWorker.java | 6 ++-
.../java/org/apache/doris/qe/StmtExecutor.java | 12 ++++--
7 files changed, 47 insertions(+), 27 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
index 18852d8c04c..638c1123820 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Status.java
@@ -21,6 +21,8 @@ import org.apache.doris.proto.Types.PStatus;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+
public class Status {
public static final Status OK = new Status();
public static final Status CANCELLED = new Status(TStatusCode.CANCELLED,
"Cancelled");
@@ -43,6 +45,11 @@ public class Status {
this.errorMsg = errorMsg;
}
+ public Status(TStatusCode code, final String errorMsg, final
Object...params) {
+ this.errorCode = code;
+ this.errorMsg = ParameterizedMessage.format(errorMsg, params);
+ }
+
public Status(final TStatus status) {
this.errorCode = status.status_code;
if (status.isSetErrorMsgs()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
index 5718e68c6b0..baf6d922e4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/CoordInterface.java
@@ -25,7 +25,7 @@ public interface CoordInterface {
public RowBatch getNext() throws Exception;
- public void cancel(Types.PPlanFragmentCancelReason cancelReason);
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String
errorMsg);
// When call exec or get next data finished, should call this method to
release
// some resource.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9e22f853c73..dac9be06b9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1443,7 +1443,7 @@ public class Coordinator implements CoordInterface {
// We use a very conservative cancel strategy.
// 0. If backends has zero process epoch, do not cancel. Zero process
epoch usually arises in cluster upgrading.
// 1. If process epoch is same, do not cancel. Means backends does not
restart or die.
- public boolean shouldCancel(List<Backend> currentBackends) {
+ public Status shouldCancel(List<Backend> currentBackends) {
Map<Long, Backend> curBeMap = Maps.newHashMap();
for (Backend be : currentBackends) {
curBeMap.put(be.getId(), be);
@@ -1456,21 +1456,24 @@ public class Coordinator implements CoordInterface {
for (PipelineExecContext pipelineExecContext :
pipelineExecContexts.values()) {
Backend be =
curBeMap.get(pipelineExecContext.backend.getId());
if (be == null || !be.isAlive()) {
- LOG.warn("Backend {} not exists or dead, query {}
should be cancelled",
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Backend {} not exists or dead, query {}
should be cancelled",
pipelineExecContext.backend.toString(),
DebugUtil.printId(queryId));
- return true;
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
}
// Backend process epoch changed, indicates that this be
restarts, query should be cancelled.
// Check zero since during upgrading, older version oplog
will not persistent be start time
// so newer version follower will get zero epoch when
replaying oplog or snapshot
if (pipelineExecContext.beProcessEpoch !=
be.getProcessEpoch() && be.getProcessEpoch() != 0) {
- LOG.warn("Backend process epoch changed, previous {}
now {}, "
- + "means this be has already
restarted, should cancel this coordinator,"
- + " query id {}",
- pipelineExecContext.beProcessEpoch,
be.getProcessEpoch(),
- DebugUtil.printId(queryId));
- return true;
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Backend process epoch changed, previous {}
now {}, "
+ + "means this be has already restarted, should
cancel this coordinator,"
+ + "query id {}",
pipelineExecContext.beProcessEpoch, be.getProcessEpoch(),
+ DebugUtil.printId(queryId));
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we
are upgrading cluster?",
be.toString());
@@ -1481,23 +1484,27 @@ public class Coordinator implements CoordInterface {
for (BackendExecStates beExecState : beToExecStates.values()) {
Backend be = curBeMap.get(beExecState.beId);
if (be == null || !be.isAlive()) {
- LOG.warn("Backend {} not exists or dead, query {}
should be cancelled.",
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Backend {} not exists or dead, query {}
should be cancelled.",
beExecState.beId, DebugUtil.printId(queryId));
- return true;
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
}
if (beExecState.beProcessEpoch != be.getProcessEpoch() &&
be.getProcessEpoch() != 0) {
- LOG.warn("Process epoch changed, previous {} now {},
means this be has already restarted, "
- + "should cancel this coordinator,
query id {}",
+ Status errorStatus = new Status(TStatusCode.CANCELLED,
+ "Process epoch changed, previous {} now {},
means this be has already restarted,"
+ + "should cancel this coordinator, query id
{}",
beExecState.beProcessEpoch,
be.getProcessEpoch(), DebugUtil.printId(queryId));
- return true;
+ LOG.warn(errorStatus.getErrorMsg());
+ return errorStatus;
} else if (be.getProcessEpoch() == 0) {
LOG.warn("Backend {} has zero process epoch, maybe we
are upgrading cluster?", be.toString());
}
}
}
- return false;
+ return Status.OK;
} finally {
unlock();
}
@@ -1507,14 +1514,14 @@ public class Coordinator implements CoordInterface {
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
- cancel(Types.PPlanFragmentCancelReason.USER_CANCEL);
+ cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
if (queueToken != null) {
queueToken.signalForCancel();
}
}
@Override
- public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String
errorMsg) {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
@@ -1527,7 +1534,7 @@ public class Coordinator implements CoordInterface {
DebugUtil.printId(queryId), queryStatus.toString(),
new Exception("cancel failed"));
} else {
- queryStatus.updateStatus(TStatusCode.CANCELLED, "cancelled");
+ queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
}
LOG.warn("Cancel execution of query {}, this is a outside invoke,
cancelReason {}",
DebugUtil.printId(queryId), cancelReason.toString());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
index f31251dae17..b4812280896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExec.java
@@ -169,7 +169,7 @@ public class PointQueryExec implements CoordInterface {
}
@Override
- public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String
errorMsg) {
// Do nothing
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
index 572367fa33b..b0af8431471 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/PointQueryExecutor.java
@@ -163,7 +163,7 @@ public class PointQueryExecutor implements CoordInterface {
}
@Override
- public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
+ public void cancel(Types.PPlanFragmentCancelReason cancelReason, String
errorMsg) {
// Do nothing
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
index 500cad0f28d..cf1b4fec286 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryCancelWorker.java
@@ -17,6 +17,7 @@
package org.apache.doris.qe;
+import org.apache.doris.common.Status;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.proto.Types;
import org.apache.doris.system.Backend;
@@ -36,10 +37,11 @@ public class QueryCancelWorker extends MasterDaemon {
List<Backend> allBackends = systemInfoService.getAllBackends();
for (Coordinator co : QeProcessorImpl.INSTANCE.getAllCoordinators()) {
- if (co.shouldCancel(allBackends)) {
+ Status status = co.shouldCancel(allBackends);
+ if (!status.ok()) {
// TODO(zhiqiang): We need more clear cancel message, so that
user can figure out what happened
// by searching log.
- co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ co.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
status.getErrorMsg());
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 30dbedbf3de..dbab3ab3957 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -103,6 +103,7 @@ import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.NereidsSqlCacheManager;
+import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.profile.Profile;
@@ -1453,7 +1454,7 @@ public class StmtExecutor {
public void cancel(Types.PPlanFragmentCancelReason cancelReason) {
Coordinator coordRef = coord;
if (coordRef != null) {
- coordRef.cancel(cancelReason);
+ coordRef.cancel(cancelReason, "");
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
@@ -1874,8 +1875,11 @@ public class StmtExecutor {
// notify all be cancel running fragment
// in some case may block all fragment handle threads
// details see issue https://github.com/apache/doris/issues/16203
- LOG.warn("cancel fragment query_id:{} cause {}",
DebugUtil.printId(context.queryId()), e.getMessage());
- coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
+ Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR,
+ "cancel fragment query_id:{} cause {}",
+ DebugUtil.printId(context.queryId()), e.getMessage());
+ LOG.warn(internalErrorSt.getErrorMsg());
+ coordBase.cancel(Types.PPlanFragmentCancelReason.INTERNAL_ERROR,
internalErrorSt.getErrorMsg());
throw e;
} finally {
coordBase.close();
@@ -2257,7 +2261,7 @@ public class StmtExecutor {
}
boolean notTimeout = coord.join(execTimeout);
if (!coord.isDone()) {
- coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
+ coord.cancel(Types.PPlanFragmentCancelReason.TIMEOUT,
"timeout");
if (notTimeout) {
errMsg = coord.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("There exists unhealthy
backend. "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]