This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e7082495fc8 [enhancement](stmt-forward) make forwarded stmt cancelable
(#31688)
e7082495fc8 is described below
commit e7082495fc84be53bab7888457bd1eed54c87d43
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Mar 26 17:08:28 2024 +0800
[enhancement](stmt-forward) make forwarded stmt cancelable (#31688)
Co-authored-by: Xin Liao <[email protected]>
---
.../java/org/apache/doris/qe/MasterOpExecutor.java | 36 +++++++++++++++++++---
.../java/org/apache/doris/qe/StmtExecutor.java | 8 +++++
.../apache/doris/service/FrontendServiceImpl.java | 27 +++++++++++++++-
gensrc/thrift/FrontendService.thrift | 1 +
4 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index cdf588eaebf..c54fbace2d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -28,6 +28,7 @@ import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
@@ -49,6 +50,8 @@ public class MasterOpExecutor {
private final ConnectContext ctx;
private TMasterOpResult result;
+ private TNetworkAddress masterAddr;
+
private int waitTimeoutMs;
// the total time of thrift connectTime add readTime and writeTime
private int thriftTimeoutMs;
@@ -85,17 +88,40 @@ public class MasterOpExecutor {
waitOnReplaying();
}
+ public void cancel() throws Exception {
+ TUniqueId queryId = ctx.queryId();
+ if (queryId == null) {
+ return;
+ }
+ Preconditions.checkNotNull(masterAddr, "query with id %s is not
forwarded to master", queryId);
+ TMasterOpRequest request = new TMasterOpRequest();
+ request.setCancelQeury(true);
+ request.setQueryId(queryId);
+ request.setDb(ctx.getDatabase());
+ request.setUser(ctx.getQualifiedUser());
+ request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+ request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+ // just make the protocol happy
+ request.setSql("");
+ result = forward(masterAddr, request);
+ waitOnReplaying();
+ }
+
private void waitOnReplaying() throws DdlException {
LOG.info("forwarding to master get result max journal id: {}",
result.maxJournalId);
ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId,
waitTimeoutMs);
}
- // Send request to Master
private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
- ctx.getEnv().checkReadyOrThrow();
String masterHost = ctx.getEnv().getMasterHost();
int masterRpcPort = ctx.getEnv().getMasterRpcPort();
- TNetworkAddress thriftAddress = new TNetworkAddress(masterHost,
masterRpcPort);
+ masterAddr = new TNetworkAddress(masterHost, masterRpcPort);
+ return forward(masterAddr, params);
+ }
+
+ // Send request to Master
+ private TMasterOpResult forward(TNetworkAddress thriftAddress,
TMasterOpRequest params) throws Exception {
+ ctx.getEnv().checkReadyOrThrow();
FrontendService.Client client;
try {
@@ -182,9 +208,9 @@ public class MasterOpExecutor {
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setSyncJournalOnly(true);
+ params.setDb(ctx.getDatabase());
+ params.setUser(ctx.getQualifiedUser());
// just make the protocol happy
- params.setDb("");
- params.setUser("");
params.setSql("");
return params;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 808fea13be2..78465427d49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1454,6 +1454,14 @@ public class StmtExecutor {
// Because this is called by other thread
public void cancel() {
+ if (masterOpExecutor != null) {
+ try {
+ masterOpExecutor.cancel();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return;
+ }
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index df51b868359..ad25442106e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -233,6 +233,7 @@ import org.apache.doris.thrift.TTableRef;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@@ -288,6 +289,9 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private ConcurrentHashMap<Long, Integer>
multiTableFragmentInstanceIdIndexMap =
new ConcurrentHashMap<>(64);
+ private final Map<TUniqueId, ConnectContext> proxyQueryIdToConnCtx =
+ new ConcurrentHashMap<>(64);
+
private static TNetworkAddress getMasterAddress() {
Env env = Env.getCurrentEnv();
String masterHost = env.getMasterHost();
@@ -987,6 +991,22 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setPacket("".getBytes());
return result;
}
+ if (params.isSetCancelQeury() && params.isCancelQeury()) {
+ if (!params.isSetQueryId()) {
+ throw new TException("a query id is needed to cancel a query");
+ }
+ TUniqueId queryId = params.getQueryId();
+ ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
+ if (ctx != null) {
+ ctx.cancelQuery();
+ }
+ final TMasterOpResult result = new TMasterOpResult();
+ result.setStatusCode(0);
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ // just make the protocol happy
+ result.setPacket("".getBytes());
+ return result;
+ }
// add this log so that we can track this stmt
if (LOG.isDebugEnabled()) {
@@ -1007,7 +1027,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
} else {
throw new TException("unknown ConnectType: " +
context.getConnectType());
}
-
+ Runnable clearCallback = () -> {};
+ if (params.isSetQueryId()) {
+ proxyQueryIdToConnCtx.put(params.getQueryId(), context);
+ clearCallback = () ->
proxyQueryIdToConnCtx.remove(params.getQueryId());
+ }
TMasterOpResult result = processor.proxyExecute(params);
if
(QueryState.MysqlStateType.ERR.name().equalsIgnoreCase(result.getStatus())) {
context.getState().setError(result.getStatus());
@@ -1015,6 +1039,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
context.getState().setOk();
}
ConnectContext.remove();
+ clearCallback.run();
return result;
}
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 7251afd7624..568b174c11c 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -524,6 +524,7 @@ struct TMasterOpRequest {
24: optional bool syncJournalOnly // if set to true, this request means to
do nothing but just sync max journal id of master
25: optional string defaultCatalog
26: optional string defaultDatabase
+ 27: optional bool cancel_qeury // if set to true, this request means to
cancel one forwarded query, and query_id needs to be set
// selectdb cloud
1000: optional string cloud_cluster
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]