This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new e3e93026bf [feature](http) add api for showing current queries and
kill query (#11657)
e3e93026bf is described below
commit e3e93026bfe629cbde117232401ecd10c1c803f6
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Aug 11 10:32:46 2022 +0800
[feature](http) add api for showing current queries and kill query (#11657)
---
.../fe/manager/query-profile-action.md | 62 ++++++++-
.../fe/manager/query-profile-action.md | 69 +++++++++-
.../httpv2/rest/manager/QueryProfileAction.java | 149 ++++++++++++++++++++-
.../java/org/apache/doris/qe/ConnectContext.java | 6 +-
4 files changed, 278 insertions(+), 8 deletions(-)
diff --git
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
index 037ca5a437..50d59191a8 100644
---
a/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++
b/docs/en/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -305,4 +305,64 @@ Get the tree profile information of the specified query
id, same as `show query
},
"count": 0
}
-```
\ No newline at end of file
+```
+
+## Current running queries
+
+`GET /rest/v2/manager/query/current_queries`
+
+### Description
+
+Same as `show proc "/current_query_stmts"`, return current running queries.
+
+### Path parameters
+
+### Query parameters
+
+* `is_all_node`
+
+ Optional. Return current running queries from all FE if set to true.
Default is true.
+
+### Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": {
+ "columnNames": ["Frontend", "QueryId", "ConnectionId",
"Database", "User", "ExecTime", "SqlHash", "Statement"],
+ "rows": [
+ ["172.19.0.3", "108e47ab438a4560-ab1651d16c036491",
"2", "", "root", "6074", "1a35f62f4b14b9d7961b057b77c3102f", "select
sleep(60)"],
+ ["172.19.0.11", "3606cad4e34b49c6-867bf6862cacc645",
"3", "", "root", "9306", "1a35f62f4b14b9d7961b057b77c3102f", "select sleep(60)"]
+ ]
+ },
+ "count": 0
+}
+```
+
+## Cancel query
+
+`POST /rest/v2/manager/query/kill/{connection_id}`
+
+### Description
+
+Cancel query of specified connection.
+
+### Path parameters
+
+* `{connection_id}`
+
+ connection id
+
+### Query parameters
+
+### Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": "",
+ "count": 0
+}
+```
diff --git
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
index f1acaebbe7..1917237807 100644
---
a/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
+++
b/docs/zh-CN/administrator-guide/http-actions/fe/manager/query-profile-action.md
@@ -34,9 +34,16 @@ under the License.
`GET /rest/v2/manager/query/profile/text/{query_id}`
+`GET /rest/v2/manager/query/profile/graph/{query_id}`
+
+`GET /rest/v2/manager/query/profile/json/{query_id}`
+
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
-`GET /rest/v2/manager/query/profile/graph/{query_id}`
+`GET /rest/v2/manager/query/current_queries`
+
+`GET /rest/v2/manager/query/kill/{connection_id}`
+
## 获取查询信息
@@ -306,3 +313,63 @@ GET /rest/v2/manager/query/query_info
"count": 0
}
```
+
+## 正在执行的query
+
+`GET /rest/v2/manager/query/current_queries`
+
+### Description
+
+同 `show proc "/current_query_stmts"`,返回当前正在执行的 query
+
+### Path parameters
+
+### Query parameters
+
+* `is_all_node`
+
+ 可选,若为 true 则返回所有FE节点当前正在执行的 query 信息。默认为 true。
+
+### Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": {
+ "columnNames": ["Frontend", "QueryId", "ConnectionId",
"Database", "User", "ExecTime", "SqlHash", "Statement"],
+ "rows": [
+ ["172.19.0.3", "108e47ab438a4560-ab1651d16c036491",
"2", "", "root", "6074", "1a35f62f4b14b9d7961b057b77c3102f", "select
sleep(60)"],
+ ["172.19.0.11", "3606cad4e34b49c6-867bf6862cacc645",
"3", "", "root", "9306", "1a35f62f4b14b9d7961b057b77c3102f", "select sleep(60)"]
+ ]
+ },
+ "count": 0
+}
+```
+
+## 取消query
+
+`POST /rest/v2/manager/query/kill/{connection_id}`
+
+### Description
+
+取消执行连接中正在执行的 query
+
+### Path parameters
+
+* `{connection_id}`
+
+ connection id
+
+### Query parameters
+
+### Response
+
+```
+{
+ "msg": "success",
+ "code": 0,
+ "data": "",
+ "count": 0
+}
+```
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
index 28e96ddbe3..7cbaa8273b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/QueryProfileAction.java
@@ -21,6 +21,8 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.proc.CurrentQueryStatementsProcNode;
+import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.common.profile.ProfileTreeNode;
import org.apache.doris.common.profile.ProfileTreePrinter;
import org.apache.doris.common.util.ProfileManager;
@@ -29,6 +31,8 @@ import org.apache.doris.httpv2.rest.RestBaseController;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.service.ExecuteEnv;
+import org.apache.doris.service.FrontendOptions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@@ -58,6 +62,14 @@ import javax.servlet.http.HttpServletResponse;
/*
* Used to return query information and query profile.
+ * base: /rest/v2/manager/query
+ * 1. /query_info
+ * 2. /sql/{query_id}
+ * 3. /profile/{format}/{query_id}
+ * 4. /trace_id/{trace_id}
+ * 5. /profile/fragments/{query_id}
+ * 6. /current_queries
+ * 7. /kill/{connection_id}
*/
@RestController
@RequestMapping("/rest/v2/manager/query")
@@ -81,10 +93,11 @@ public class QueryProfileAction extends RestBaseController {
private static final String FRAGMENT_ID = "fragment_id";
private static final String INSTANCE_ID = "instance_id";
- public static final ImmutableList<String> QUERY_TITLE_NAMES = new
ImmutableList.Builder<String>()
-
.add(QUERY_ID).add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT)
-
.add(QUERY_TYPE).add(START_TIME).add(END_TIME).add(TOTAL).add(QUERY_STATE)
- .build();
+ private static final String FRONTEND = "Frontend";
+
+ public static final ImmutableList<String> QUERY_TITLE_NAMES = new
ImmutableList.Builder<String>().add(QUERY_ID)
+
.add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
+ .add(TOTAL).add(QUERY_STATE).build();
private List<String> requestAllFe(String httpPath, Map<String, String>
arguments, String authorization) {
List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
@@ -336,4 +349,130 @@ public class QueryProfileAction extends
RestBaseController {
}
return ResponseEntityBuilder.ok(graph);
}
-}
\ No newline at end of file
+
+ @NotNull
+ private ResponseEntity getJsonProfile(HttpServletRequest request, String
queryId, String fragmentId,
+ String instanceId, boolean isAllNode) {
+ Map<String, String> graph = Maps.newHashMap();
+ if (isAllNode) {
+ return getProfileFromAllFrontends(request, "json", queryId,
fragmentId, instanceId);
+ } else {
+ try {
+ JSONObject json;
+ if (Strings.isNullOrEmpty(fragmentId) ||
Strings.isNullOrEmpty(instanceId)) {
+ ProfileTreeNode treeRoot =
ProfileManager.getInstance().getFragmentProfileTree(queryId, queryId);
+ json =
ProfileTreePrinter.printFragmentTreeInJson(treeRoot,
ProfileTreePrinter.PrintLevel.FRAGMENT);
+ } else {
+ ProfileTreeNode treeRoot = ProfileManager.getInstance()
+ .getInstanceProfileTree(queryId, queryId,
fragmentId, instanceId);
+ json =
ProfileTreePrinter.printFragmentTreeInJson(treeRoot,
ProfileTreePrinter.PrintLevel.INSTANCE);
+ }
+ graph.put("profile", json.toJSONString());
+ } catch (Exception e) {
+ LOG.warn("get profile graph error, queryId:{}, fragementId:{},
instanceId:{}", queryId, fragmentId,
+ instanceId, e);
+ }
+ }
+ return ResponseEntityBuilder.ok(graph);
+ }
+
+ @NotNull
+ private ResponseEntity getProfileFromAllFrontends(HttpServletRequest
request, String format, String queryId,
+ String fragmentId, String instanceId) {
+ String httpPath = "/rest/v2/manager/query/profile/" + format + "/" +
queryId;
+ ImmutableMap.Builder<String, String> builder =
+ ImmutableMap.<String, String>builder().put(IS_ALL_NODE_PARA,
"false");
+ if (!Strings.isNullOrEmpty(fragmentId)) {
+ builder.put(FRAGMENT_ID, fragmentId);
+ }
+ if (!Strings.isNullOrEmpty(instanceId)) {
+ builder.put(INSTANCE_ID, instanceId);
+ }
+ List<String> dataList = requestAllFe(httpPath, builder.build(),
request.getHeader(NodeAction.AUTHORIZATION));
+ Map<String, String> result = Maps.newHashMap();
+ if (!dataList.isEmpty()) {
+ try {
+ String profile =
JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("profile").getAsString();
+ result.put("profile", profile);
+ } catch (Exception e) {
+ return ResponseEntityBuilder.badRequest(e.getMessage());
+ }
+ }
+ return ResponseEntityBuilder.ok(result);
+ }
+
+ /**
+ * return the result of CurrentQueryStatementsProcNode.
+ *
+ * @param request
+ * @param response
+ * @param isAllNode
+ * @return
+ */
+ @RequestMapping(path = "/current_queries", method = RequestMethod.GET)
+ public Object currentQueries(HttpServletRequest request,
HttpServletResponse response,
+ @RequestParam(value = IS_ALL_NODE_PARA, required = false,
defaultValue = "true") boolean isAllNode) {
+ executeCheckPassword(request, response);
+ checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
+
+ if (isAllNode) {
+ // Get current queries from all FE
+ String httpPath = "/rest/v2/manager/query/current_queries";
+ Map<String, String> arguments = Maps.newHashMap();
+ arguments.put(IS_ALL_NODE_PARA, "false");
+ List<List<String>> queries = Lists.newArrayList();
+ List<String> dataList = requestAllFe(httpPath, arguments,
request.getHeader(NodeAction.AUTHORIZATION));
+ for (String data : dataList) {
+ try {
+ NodeAction.NodeInfo nodeInfo =
GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
+ }.getType());
+ queries.addAll(nodeInfo.getRows());
+ } catch (Exception e) {
+ LOG.warn("parse query info error: {}", data, e);
+ }
+ }
+ List<String> titles =
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+ titles.add(0, FRONTEND);
+ return ResponseEntityBuilder.ok(new NodeAction.NodeInfo(titles,
queries));
+ } else {
+ try {
+ CurrentQueryStatementsProcNode node = new
CurrentQueryStatementsProcNode();
+ ProcResult result = node.fetchResult();
+ // add frontend info at first column.
+ List<String> titles =
Lists.newArrayList(CurrentQueryStatementsProcNode.TITLE_NAMES);
+ titles.add(0, FRONTEND);
+ List<List<String>> rows = result.getRows();
+ String feIp = FrontendOptions.getLocalHostAddress();
+ for (List<String> row : rows) {
+ row.add(0, feIp);
+ }
+ return ResponseEntityBuilder.ok(new
NodeAction.NodeInfo(titles, rows));
+ } catch (AnalysisException e) {
+ return ResponseEntityBuilder.badRequest(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * kill queries with specified connection id
+ *
+ * @param request
+ * @param response
+ * @param connectionId
+ * @return
+ */
+ @RequestMapping(path = "/kill/{connection_id}", method =
RequestMethod.POST)
+ public Object killQuery(HttpServletRequest request, HttpServletResponse
response,
+ @PathVariable("connection_id") int connectionId) {
+ executeCheckPassword(request, response);
+ checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(),
PrivPredicate.ADMIN);
+
+ ExecuteEnv env = ExecuteEnv.getInstance();
+ ConnectContext ctx = env.getScheduler().getContext(connectionId);
+ if (ctx == null) {
+ return ResponseEntityBuilder.notFound("connection not found");
+ }
+ ctx.cancelQuery();
+ return ResponseEntityBuilder.ok();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 0e5c6565ba..ce9a13b705 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -470,7 +470,11 @@ public class ConnectContext {
// Close channel to break connection with client
getMysqlChannel().close();
}
- // Now, cancel running process.
+ // Now, cancel running query.
+ cancelQuery();
+ }
+
+ public void cancelQuery() {
StmtExecutor executorRef = executor;
if (executorRef != null) {
executorRef.cancel();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]