This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e7082495fc8 [enhancement](stmt-forward) make forwarded stmt cancelable 
(#31688)
e7082495fc8 is described below

commit e7082495fc84be53bab7888457bd1eed54c87d43
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Mar 26 17:08:28 2024 +0800

    [enhancement](stmt-forward) make forwarded stmt cancelable (#31688)
    
    Co-authored-by: Xin Liao <[email protected]>
---
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 36 +++++++++++++++++++---
 .../java/org/apache/doris/qe/StmtExecutor.java     |  8 +++++
 .../apache/doris/service/FrontendServiceImpl.java  | 27 +++++++++++++++-
 gensrc/thrift/FrontendService.thrift               |  1 +
 4 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index cdf588eaebf..c54fbace2d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -28,6 +28,7 @@ import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import org.apache.logging.log4j.LogManager;
@@ -49,6 +50,8 @@ public class MasterOpExecutor {
     private final ConnectContext ctx;
     private TMasterOpResult result;
 
+    private TNetworkAddress masterAddr;
+
     private int waitTimeoutMs;
     // the total time of thrift connectTime add readTime and writeTime
     private int thriftTimeoutMs;
@@ -85,17 +88,40 @@ public class MasterOpExecutor {
         waitOnReplaying();
     }
 
+    public void cancel() throws Exception {
+        TUniqueId queryId = ctx.queryId();
+        if (queryId == null) {
+            return;
+        }
+        Preconditions.checkNotNull(masterAddr, "query with id %s is not 
forwarded to master", queryId);
+        TMasterOpRequest request = new TMasterOpRequest();
+        request.setCancelQeury(true);
+        request.setQueryId(queryId);
+        request.setDb(ctx.getDatabase());
+        request.setUser(ctx.getQualifiedUser());
+        request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+        request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+        // just make the protocol happy
+        request.setSql("");
+        result = forward(masterAddr, request);
+        waitOnReplaying();
+    }
+
     private void waitOnReplaying() throws DdlException {
         LOG.info("forwarding to master get result max journal id: {}", 
result.maxJournalId);
         ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, 
waitTimeoutMs);
     }
 
-    // Send request to Master
     private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
-        ctx.getEnv().checkReadyOrThrow();
         String masterHost = ctx.getEnv().getMasterHost();
         int masterRpcPort = ctx.getEnv().getMasterRpcPort();
-        TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, 
masterRpcPort);
+        masterAddr = new TNetworkAddress(masterHost, masterRpcPort);
+        return forward(masterAddr, params);
+    }
+
+    // Send request to Master
+    private TMasterOpResult forward(TNetworkAddress thriftAddress, 
TMasterOpRequest params) throws Exception {
+        ctx.getEnv().checkReadyOrThrow();
 
         FrontendService.Client client;
         try {
@@ -182,9 +208,9 @@ public class MasterOpExecutor {
         params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
         params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
         params.setSyncJournalOnly(true);
+        params.setDb(ctx.getDatabase());
+        params.setUser(ctx.getQualifiedUser());
         // just make the protocol happy
-        params.setDb("");
-        params.setUser("");
         params.setSql("");
         return params;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 808fea13be2..78465427d49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1454,6 +1454,14 @@ public class StmtExecutor {
 
     // Because this is called by other thread
     public void cancel() {
+        if (masterOpExecutor != null) {
+            try {
+                masterOpExecutor.cancel();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+            return;
+        }
         Coordinator coordRef = coord;
         if (coordRef != null) {
             coordRef.cancel();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index df51b868359..ad25442106e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -233,6 +233,7 @@ import org.apache.doris.thrift.TTableRef;
 import org.apache.doris.thrift.TTableStatus;
 import org.apache.doris.thrift.TTabletLocation;
 import org.apache.doris.thrift.TTxnParams;
+import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
 import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@@ -288,6 +289,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     private ConcurrentHashMap<Long, Integer> 
multiTableFragmentInstanceIdIndexMap =
             new ConcurrentHashMap<>(64);
 
+    private final Map<TUniqueId, ConnectContext> proxyQueryIdToConnCtx =
+            new ConcurrentHashMap<>(64);
+
     private static TNetworkAddress getMasterAddress() {
         Env env = Env.getCurrentEnv();
         String masterHost = env.getMasterHost();
@@ -987,6 +991,22 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             result.setPacket("".getBytes());
             return result;
         }
+        if (params.isSetCancelQeury() && params.isCancelQeury()) {
+            if (!params.isSetQueryId()) {
+                throw new TException("a query id is needed to cancel a query");
+            }
+            TUniqueId queryId = params.getQueryId();
+            ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
+            if (ctx != null) {
+                ctx.cancelQuery();
+            }
+            final TMasterOpResult result = new TMasterOpResult();
+            result.setStatusCode(0);
+            result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+            // just make the protocol happy
+            result.setPacket("".getBytes());
+            return result;
+        }
 
         // add this log so that we can track this stmt
         if (LOG.isDebugEnabled()) {
@@ -1007,7 +1027,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else {
             throw new TException("unknown ConnectType: " + 
context.getConnectType());
         }
-
+        Runnable clearCallback = () -> {};
+        if (params.isSetQueryId()) {
+            proxyQueryIdToConnCtx.put(params.getQueryId(), context);
+            clearCallback = () -> 
proxyQueryIdToConnCtx.remove(params.getQueryId());
+        }
         TMasterOpResult result = processor.proxyExecute(params);
         if 
(QueryState.MysqlStateType.ERR.name().equalsIgnoreCase(result.getStatus())) {
             context.getState().setError(result.getStatus());
@@ -1015,6 +1039,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             context.getState().setOk();
         }
         ConnectContext.remove();
+        clearCallback.run();
         return result;
     }
 
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 7251afd7624..568b174c11c 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -524,6 +524,7 @@ struct TMasterOpRequest {
     24: optional bool syncJournalOnly // if set to true, this request means to 
do nothing but just sync max journal id of master
     25: optional string defaultCatalog
     26: optional string defaultDatabase
+    27: optional bool cancel_qeury // if set to true, this request means to 
cancel one forwarded query, and query_id needs to be set
 
     // selectdb cloud
     1000: optional string cloud_cluster


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to