This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
     new e3e93026bf [feature](http) add api for showing current queries and 
kill query (#11657)
e3e93026bf is described below

commit e3e93026bfe629cbde117232401ecd10c1c803f6
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Aug 11 10:32:46 2022 +0800

    [feature](http) add api for showing current queries and kill query (#11657)
---
 .../fe/manager/query-profile-action.md             |  62 ++++++++-
 .../fe/manager/query-profile-action.md             |  69 +++++++++-
 .../httpv2/rest/manager/QueryProfileAction.java    | 149 ++++++++++++++++++++-
 .../java/org/apache/doris/qe/ConnectContext.java   |   6 +-
 4 files changed, 278 insertions(+), 8 deletions(-)

diff --git 
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md 
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
index 037ca5a437..50d59191a8 100644
--- 
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++ 
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -305,4 +305,64 @@ Get the tree profile information of the specified query 
id, same as `show query
     },
     "count": 0
 }
-```
\ No newline at end of file
+```
+
+## Current running queries
+
+`GET /rest/v2/manager/query/current_queries`
+
+### Description
+
+Same as `show proc "/current_query_stmts"`, return current running queries.
+    
+### Path parameters
+
+### Query parameters
+
+* `is_all_node`
+  
+    Optional. Return current running queries from all FE if set to true. 
Default is true.
+
+### Response
+
+```
+{
+       "msg": "success",
+       "code": 0,
+       "data": {
+               "columnNames": ["Frontend", "QueryId", "ConnectionId", 
"Database", "User", "ExecTime", "SqlHash", "Statement"],
+               "rows": [
+                       ["172.19.0.3", "108e47ab438a4560-ab1651d16c036491", 
"2", "", "root", "6074", "1a35f62f4b14b9d7961b057b77c3102f", "select 
sleep(60)"],
+                       ["172.19.0.11", "3606cad4e34b49c6-867bf6862cacc645", 
"3", "", "root", "9306", "1a35f62f4b14b9d7961b057b77c3102f", "select sleep(60)"]
+               ]
+       },
+       "count": 0
+}
+```
+
+## Cancel query
+
+`POST /rest/v2/manager/query/kill/{connection_id}`
+
+### Description
+
+Cancel query of specified connection.
+    
+### Path parameters
+
+* `{connection_id}`
+
+    connection id
+
+### Query parameters
+
+### Response
+
+```
+{
+    "msg": "success",
+    "code": 0,
+    "data": "",
+    "count": 0
+}
+```
diff --git 
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
 
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
index f1acaebbe7..1917237807 100644
--- 
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++ 
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -34,9 +34,16 @@ under the License.
 
 `GET /rest/v2/manager/query/profile/text/{query_id}`
 
+`GET /rest/v2/manager/query/profile/graph/{query_id}`
+
+`GET /rest/v2/manager/query/profile/json/{query_id}`
+
 `GET /rest/v2/manager/query/profile/fragments/{query_id}`
 
-`GET /rest/v2/manager/query/profile/graph/{query_id}`
+`GET /rest/v2/manager/query/current_queries`
+
+`GET /rest/v2/manager/query/kill/{connection_id}`
+
 
 ## 获取查询信息
 
@@ -306,3 +313,63 @@ GET /rest/v2/manager/query/query_info
     "count": 0
 }
 ```
+
+## 正在执行的query
+
+`GET /rest/v2/manager/query/current_queries`
+
+### Description
+
+同 `show proc "/current_query_stmts"`,返回当前正在执行的 query
+    
+### Path parameters
+
+### Query parameters
+
+* `is_all_node`
+  
+    可选,若为 true 则返回所有FE节点当前正在执行的 query 信息。默认为 true。
+
+### Response
+
+```
+{
+       "msg": "success",
+       "code": 0,
+       "data": {
+               "columnNames": ["Frontend", "QueryId", "ConnectionId", 
"Database", "User", "ExecTime", "SqlHash", "Statement"],
+               "rows": [
+                       ["172.19.0.3", "108e47ab438a4560-ab1651d16c036491", 
"2", "", "root", "6074", "1a35f62f4b14b9d7961b057b77c3102f", "select 
sleep(60)"],
+                       ["172.19.0.11", "3606cad4e34b49c6-867bf6862cacc645", 
"3", "", "root", "9306", "1a35f62f4b14b9d7961b057b77c3102f", "select sleep(60)"]
+               ]
+       },
+       "count": 0
+}
+```
+
+## 取消query
+
+`POST /rest/v2/manager/query/kill/{connection_id}`
+
+### Description
+
+取消执行连接中正在执行的 query
+    
+### Path parameters
+
+* `{connection_id}`
+
+    connection id
+
+### Query parameters
+
+### Response
+
+```
+{
+    "msg": "success",
+    "code": 0,
+    "data": "",
+    "count": 0
+}
+```
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 28e96ddbe3..7cbaa8273b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Catalog;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
+import org.apache.doris.common.proc.ProcResult;
 import org.apache.doris.common.profile.ProfileTreeNode;
 import org.apache.doris.common.profile.ProfileTreePrinter;
 import org.apache.doris.common.util.ProfileManager;
@@ -29,6 +31,8 @@ import org.apache.doris.httpv2.rest.RestBaseController;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.service.FrontendOptions;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
@@ -58,6 +62,14 @@ import javax.servlet.http.HttpServletResponse;
 
 /*
  * Used to return query information and query profile.
+ * base: /rest/v2/manager/query
+ * 1. /query_info
+ * 2. /sql/{query_id}
+ * 3. /profile/{format}/{query_id}
+ * 4. /trace_id/{trace_id}
+ * 5. /profile/fragments/{query_id}
+ * 6. /current_queries
+ * 7. /kill/{connection_id}
  */
 @RestController
 @RequestMapping("/rest/v2/manager/query")
@@ -81,10 +93,11 @@ public class QueryProfileAction extends RestBaseController {
     private static final String FRAGMENT_ID = "fragment_id";
     private static final String INSTANCE_ID = "instance_id";
 
-    public static final ImmutableList<String> QUERY_TITLE_NAMES = new 
ImmutableList.Builder<String>()
-            
.add(QUERY_ID).add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT)
-            
.add(QUERY_TYPE).add(START_TIME).add(END_TIME).add(TOTAL).add(QUERY_STATE)
-            .build();
+    private static final String FRONTEND = "Frontend";
+
+    public static final ImmutableList<String> QUERY_TITLE_NAMES = new 
ImmutableList.Builder<String>().add(QUERY_ID)
+            
.add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
+            .add(TOTAL).add(QUERY_STATE).build();
 
     private List<String> requestAllFe(String httpPath, Map<String, String> 
arguments, String authorization) {
         List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
@@ -336,4 +349,130 @@ public class QueryProfileAction extends 
RestBaseController {
         }
         return ResponseEntityBuilder.ok(graph);
     }
-}
\ No newline at end of file
+
+    @NotNull
+    private ResponseEntity getJsonProfile(HttpServletRequest request, String 
queryId, String fragmentId,
+            String instanceId, boolean isAllNode) {
+        Map<String, String> graph = Maps.newHashMap();
+        if (isAllNode) {
+            return getProfileFromAllFrontends(request, "json", queryId, 
fragmentId, instanceId);
+        } else {
+            try {
+                JSONObject json;
+                if (Strings.isNullOrEmpty(fragmentId) || 
Strings.isNullOrEmpty(instanceId)) {
+                    ProfileTreeNode treeRoot = 
ProfileManager.getInstance().getFragmentProfileTree(queryId, queryId);
+                    json = 
ProfileTreePrinter.printFragmentTreeInJson(treeRoot, 
ProfileTreePrinter.PrintLevel.FRAGMENT);
+                } else {
+                    ProfileTreeNode treeRoot = ProfileManager.getInstance()
+                            .getInstanceProfileTree(queryId, queryId, 
fragmentId, instanceId);
+                    json = 
ProfileTreePrinter.printFragmentTreeInJson(treeRoot, 
ProfileTreePrinter.PrintLevel.INSTANCE);
+                }
+                graph.put("profile", json.toJSONString());
+            } catch (Exception e) {
+                LOG.warn("get profile graph error, queryId:{}, fragementId:{}, 
instanceId:{}", queryId, fragmentId,
+                        instanceId, e);
+            }
+        }
+        return ResponseEntityBuilder.ok(graph);
+    }
+
+    @NotNull
+    private ResponseEntity getProfileFromAllFrontends(HttpServletRequest 
request, String format, String queryId,
+            String fragmentId, String instanceId) {
+        String httpPath = "/rest/v2/manager/query/profile/" + format + "/" + 
queryId;
+        ImmutableMap.Builder<String, String> builder =
+                ImmutableMap.<String, String>builder().put(IS_ALL_NODE_PARA, 
"false");
+        if (!Strings.isNullOrEmpty(fragmentId)) {
+            builder.put(FRAGMENT_ID, fragmentId);
+        }
+        if (!Strings.isNullOrEmpty(instanceId)) {
+            builder.put(INSTANCE_ID, instanceId);
+        }
+        List<String> dataList = requestAllFe(httpPath, builder.build(), 
request.getHeader(NodeAction.AUTHORIZATION));
+        Map<String, String> result = Maps.newHashMap();
+        if (!dataList.isEmpty()) {
+            try {
+                String profile = 
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("profile").getAsString();
+                result.put("profile", profile);
+            } catch (Exception e) {
+                return ResponseEntityBuilder.badRequest(e.getMessage());
+            }
+        }
+        return ResponseEntityBuilder.ok(result);
+    }
+
+    /**
+     * return the result of CurrentQueryStatementsProcNode.
+     *
+     * @param request
+     * @param response
+     * @param isAllNode
+     * @return
+     */
+    @RequestMapping(path = "/current_queries", method = RequestMethod.GET)
+    public Object currentQueries(HttpServletRequest request, 
HttpServletResponse response,
+            @RequestParam(value = IS_ALL_NODE_PARA, required = false, 
defaultValue = "true") boolean isAllNode) {
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
+
+        if (isAllNode) {
+            // Get current queries from all FE
+            String httpPath = "/rest/v2/manager/query/current_queries";
+            Map<String, String> arguments = Maps.newHashMap();
+            arguments.put(IS_ALL_NODE_PARA, "false");
+            List<List<String>> queries = Lists.newArrayList();
+            List<String> dataList = requestAllFe(httpPath, arguments, 
request.getHeader(NodeAction.AUTHORIZATION));
+            for (String data : dataList) {
+                try {
+                    NodeAction.NodeInfo nodeInfo = 
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
+                    }.getType());
+                    queries.addAll(nodeInfo.getRows());
+                } catch (Exception e) {
+                    LOG.warn("parse query info error: {}", data, e);
+                }
+            }
+            List<String> titles = 
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+            titles.add(0, FRONTEND);
+            return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles, 
queries));
+        } else {
+            try {
+                CurrentQueryStatementsProcNode node = new 
CurrentQueryStatementsProcNode();
+                ProcResult result = node.fetchResult();
+                // add frontend info at first column.
+                List<String> titles = 
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+                titles.add(0, FRONTEND);
+                List<List<String>> rows = result.getRows();
+                String feIp = FrontendOptions.getLocalHostAddress();
+                for (List<String> row : rows) {
+                    row.add(0, feIp);
+                }
+                return ResponseEntityBuilder.ok(new 
NodeAction.NodeInfo(titles, rows));
+            } catch (AnalysisException e) {
+                return ResponseEntityBuilder.badRequest(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * kill queries with specified connection id
+     *
+     * @param request
+     * @param response
+     * @param connectionId
+     * @return
+     */
+    @RequestMapping(path = "/kill/{connection_id}", method = 
RequestMethod.POST)
+    public Object killQuery(HttpServletRequest request, HttpServletResponse 
response,
+            @PathVariable("connection_id") int connectionId) {
+        executeCheckPassword(request, response);
+        checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
+
+        ExecuteEnv env = ExecuteEnv.getInstance();
+        ConnectContext ctx = env.getScheduler().getContext(connectionId);
+        if (ctx == null) {
+            return ResponseEntityBuilder.notFound("connection not found");
+        }
+        ctx.cancelQuery();
+        return ResponseEntityBuilder.ok();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 0e5c6565ba..ce9a13b705 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -470,7 +470,11 @@ public class ConnectContext {
             // Close channel to break connection with client
             getMysqlChannel().close();
         }
-        // Now, cancel running process.
+        // Now, cancel running query.
+        cancelQuery();
+    }
+
+    public void cancelQuery() {
         StmtExecutor executorRef = executor;
         if (executorRef != null) {
             executorRef.cancel();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to