This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 207aba4d0b4 Refactor active queries (#31742)
207aba4d0b4 is described below
commit 207aba4d0b434cf7394d825ba7e1fbc96242723d
Author: wangbo <[email protected]>
AuthorDate: Tue Mar 5 13:51:36 2024 +0800
Refactor active queries (#31742)
---
.../table-functions/active_queries.md | 45 ++++++++----------
.../table-functions/active_queries.md | 45 ++++++++----------
.../commands/insert/AbstractInsertExecutor.java | 4 +-
.../main/java/org/apache/doris/qe/Coordinator.java | 4 ++
.../java/org/apache/doris/qe/StmtExecutor.java | 5 +-
.../ActiveQueriesTableValuedFunction.java | 9 ----
.../doris/tablefunction/MetadataGenerator.java | 54 +++++++++++-----------
7 files changed, 74 insertions(+), 92 deletions(-)
diff --git
a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
index 35a71b5eb60..cbc0e20845d 100644
--- a/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
+++ b/docs/en/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -45,37 +45,30 @@ This function is used in FROM clauses.
active_queries() table schema:
```
-mysql [(none)]> desc function active_queries();
-+------------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------------+--------+------+-------+---------+-------+
-| BeHost | TEXT | No | false | NULL | NONE |
-| BePort | BIGINT | No | false | NULL | NONE |
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | TEXT | No | false | NULL | NONE |
-| QueryTimeMs | BIGINT | No | false | NULL | NONE |
-| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
-| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
-| ScanRows | BIGINT | No | false | NULL | NONE |
-| ScanBytes | BIGINT | No | false | NULL | NONE |
-| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
-| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-+------------------------+--------+------+-------+---------+-------+
-14 rows in set (0.00 sec)
+mysql [(none)]>desc function active_queries();
++------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+--------+------+-------+---------+-------+
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------+--------+------+-------+---------+-------+
+7 rows in set (0.00 sec)
```
### example
```
mysql [(none)]>select * from active_queries();
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-1 row in set (0.01 sec)
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| QueryId | StartTime | QueryTimeMs |
WorkloadGroupId | Database | FrontendInstance | Sql |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 |
10002 | | localhost | select * from active_queries() |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+1 row in set (0.03 sec)
```
### keywords
diff --git
a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
index bdae08285f2..feda3c128ca 100644
--- a/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
+++ b/docs/zh-CN/docs/sql-manual/sql-functions/table-functions/active_queries.md
@@ -45,37 +45,30 @@ active_queries
active_queries()表结构:
```
-mysql [(none)]> desc function active_queries();
-+------------------------+--------+------+-------+---------+-------+
-| Field | Type | Null | Key | Default | Extra |
-+------------------------+--------+------+-------+---------+-------+
-| BeHost | TEXT | No | false | NULL | NONE |
-| BePort | BIGINT | No | false | NULL | NONE |
-| QueryId | TEXT | No | false | NULL | NONE |
-| StartTime | TEXT | No | false | NULL | NONE |
-| QueryTimeMs | BIGINT | No | false | NULL | NONE |
-| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
-| QueryCpuTimeMs | BIGINT | No | false | NULL | NONE |
-| ScanRows | BIGINT | No | false | NULL | NONE |
-| ScanBytes | BIGINT | No | false | NULL | NONE |
-| BePeakMemoryBytes | BIGINT | No | false | NULL | NONE |
-| CurrentUsedMemoryBytes | BIGINT | No | false | NULL | NONE |
-| Database | TEXT | No | false | NULL | NONE |
-| FrontendInstance | TEXT | No | false | NULL | NONE |
-| Sql | TEXT | No | false | NULL | NONE |
-+------------------------+--------+------+-------+---------+-------+
-14 rows in set (0.00 sec)
+mysql [(none)]>desc function active_queries();
++------------------+--------+------+-------+---------+-------+
+| Field | Type | Null | Key | Default | Extra |
++------------------+--------+------+-------+---------+-------+
+| QueryId | TEXT | No | false | NULL | NONE |
+| StartTime | TEXT | No | false | NULL | NONE |
+| QueryTimeMs | BIGINT | No | false | NULL | NONE |
+| WorkloadGroupId | BIGINT | No | false | NULL | NONE |
+| Database | TEXT | No | false | NULL | NONE |
+| FrontendInstance | TEXT | No | false | NULL | NONE |
+| Sql | TEXT | No | false | NULL | NONE |
++------------------+--------+------+-------+---------+-------+
+7 rows in set (0.00 sec)
```
### example
```
mysql [(none)]>select * from active_queries();
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| BeHost | BePort | QueryId | StartTime
| QueryTimeMs | WorkloadGroupId | QueryCpuTimeMs | ScanRows | ScanBytes |
BePeakMemoryBytes | CurrentUsedMemoryBytes | Database | FrontendInstance | Sql
|
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-| 127.0.0.1 | 6090 | 71fd11b7b0e438c-bc98434b97b8cb98 | 2024-01-16 16:21:15
| 7260 | 10002 | 8392 | 16082249 | 4941889536 |
360470040 | 360420915 | hits | localhost | SELECT xxxx |
-+------------+--------+----------------------------------+---------------------+-------------+-----------------+----------------+----------+------------+-------------------+------------------------+----------+------------------+-------+
-1 row in set (0.01 sec)
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| QueryId | StartTime | QueryTimeMs |
WorkloadGroupId | Database | FrontendInstance | Sql |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+| a84bf9f3ea6348e1-ac542839f8f2af5d | 2024-03-04 17:33:09 | 9 |
10002 | | localhost | select * from active_queries() |
++-----------------------------------+---------------------+-------------+-----------------+----------+------------------+--------------------------------+
+1 row in set (0.03 sec)
```
### keywords
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 34c7ad5f596..2af6212808d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -31,6 +31,7 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
@@ -110,7 +111,8 @@ public abstract class AbstractInsertExecutor {
coordinator.setLoadZeroTolerance(ctx.getSessionVariable().getEnableInsertStrict());
coordinator.setQueryType(TQueryType.LOAD);
executor.getProfile().setExecutionProfile(coordinator.getExecutionProfile());
- QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), coordinator);
+ QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
executor.getOriginStmtInString(), coordinator);
+ QeProcessorImpl.INSTANCE.registerQuery(ctx.queryId(), queryInfo);
coordinator.exec();
int execTimeout = ctx.getExecTimeout();
if (LOG.isDebugEnabled()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 8c764767c9c..307025f7c3f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -270,6 +270,10 @@ public class Coordinator implements CoordInterface {
this.tWorkloadGroups = tWorkloadGroups;
}
+ public List<TPipelineWorkloadGroup> gettWorkloadGroups() {
+ return tWorkloadGroups;
+ }
+
private List<TPipelineWorkloadGroup> tWorkloadGroups =
Lists.newArrayList();
private final ExecutionProfile executionProfile;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index dbd4357906f..725113c08f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -150,6 +150,7 @@ import
org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
+import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
@@ -2060,8 +2061,8 @@ public class StmtExecutor {
coord.setLoadZeroTolerance(context.getSessionVariable().getEnableInsertStrict());
coord.setQueryType(TQueryType.LOAD);
profile.setExecutionProfile(coord.getExecutionProfile());
-
- QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
coord);
+ QueryInfo queryInfo = new QueryInfo(ConnectContext.get(),
this.getOriginStmtInString(), coord);
+ QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
queryInfo);
Table table = insertStmt.getTargetTable();
if (table instanceof OlapTable) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
index 27f65ed7680..c4bdaaed659 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ActiveQueriesTableValuedFunction.java
@@ -35,19 +35,10 @@ public class ActiveQueriesTableValuedFunction extends
MetadataTableValuedFunctio
public static final String NAME = "active_queries";
private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
- new Column("BeHost", ScalarType.createStringType()),
- new Column("BePort", PrimitiveType.BIGINT),
new Column("QueryId", ScalarType.createStringType()),
new Column("StartTime", ScalarType.createStringType()),
new Column("QueryTimeMs", PrimitiveType.BIGINT),
new Column("WorkloadGroupId", PrimitiveType.BIGINT),
- new Column("QueryCpuTimeMs", PrimitiveType.BIGINT),
- new Column("ScanRows", PrimitiveType.BIGINT),
- new Column("ScanBytes", PrimitiveType.BIGINT),
- new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
- new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
- new Column("ShuffleSendRows", PrimitiveType.BIGINT),
new Column("Database", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()),
new Column("Sql", ScalarType.createStringType()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index 8a3df743e24..47de3be4b66 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -53,6 +53,7 @@ import
org.apache.doris.thrift.TMaterializedViewsMetadataParams;
import org.apache.doris.thrift.TMetadataTableRequestParams;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TQueriesMetadataParams;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TRow;
@@ -78,7 +79,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
public class MetadataGenerator {
@@ -473,7 +473,7 @@ public class MetadataGenerator {
}
private static TFetchSchemaTableDataResult
queriesMetadataResult(TMetadataTableRequestParams params,
-
TFetchSchemaTableDataRequest parentRequest) {
+ TFetchSchemaTableDataRequest parentRequest) {
if (!params.isSetQueriesMetadataParams()) {
return errorResult("queries metadata param is not set.");
}
@@ -487,37 +487,35 @@ public class MetadataGenerator {
}
selfNode = NetUtils.getHostnameByIp(selfNode);
- // get query
- Map<Long, Map<String, TQueryStatistics>> beQsMap =
Env.getCurrentEnv().getWorkloadRuntimeStatusMgr()
- .getBeQueryStatsMap();
- Set<Long> beIdSet = beQsMap.keySet();
-
List<TRow> dataBatch = Lists.newArrayList();
Map<String, QueryInfo> queryInfoMap =
QeProcessorImpl.INSTANCE.getQueryInfoMap();
-
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- for (Long beId : beIdSet) {
- Map<String, TQueryStatistics> qsMap = beQsMap.get(beId);
- if (qsMap == null) {
- continue;
+ for (Map.Entry<String, QueryInfo> entry : queryInfoMap.entrySet()) {
+ String queryId = entry.getKey();
+ QueryInfo queryInfo = entry.getValue();
+
+ TRow trow = new TRow();
+ trow.addToColumnValue(new TCell().setStringVal(queryId));
+
+ String strDate = sdf.format(new
Date(queryInfo.getStartExecTime()));
+ trow.addToColumnValue(new TCell().setStringVal(strDate));
+ trow.addToColumnValue(new
TCell().setLongVal(System.currentTimeMillis() - queryInfo.getStartExecTime()));
+
+ List<TPipelineWorkloadGroup> tgroupList =
queryInfo.getCoord().gettWorkloadGroups();
+ if (tgroupList != null && tgroupList.size() == 1) {
+ trow.addToColumnValue(new
TCell().setLongVal(tgroupList.get(0).id));
+ } else {
+ trow.addToColumnValue(new TCell().setLongVal(-1));
}
- Set<String> queryIdSet = qsMap.keySet();
- for (String queryId : queryIdSet) {
- QueryInfo queryInfo = queryInfoMap.get(queryId);
- if (queryInfo == null) {
- continue;
- }
- //todo(wb) add connect context for insert select
- if (queryInfo.getConnectContext() != null &&
!Env.getCurrentEnv().getAccessManager()
- .checkDbPriv(queryInfo.getConnectContext(),
queryInfo.getConnectContext().getDatabase(),
- PrivPredicate.SELECT)) {
- continue;
- }
- TQueryStatistics qs = qsMap.get(queryId);
- Backend be =
Env.getCurrentEnv().getClusterInfo().getBackend(beId);
- TRow tRow = makeQueryStatisticsTRow(sdf, queryId, be,
selfNode, queryInfo, qs);
- dataBatch.add(tRow);
+
+ if (queryInfo.getConnectContext() != null) {
+ trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getConnectContext().getDatabase()));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal(""));
}
+ trow.addToColumnValue(new TCell().setStringVal(selfNode));
+ trow.addToColumnValue(new
TCell().setStringVal(queryInfo.getSql()));
+ dataBatch.add(trow);
}
/* Get the query results from other FE also */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]