This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 94461b06509 Support system table current_queries and
queries_costs_histogram (#16890)
94461b06509 is described below
commit 94461b065095dd8499c85b8691d89da35a878299
Author: Weihao Li <[email protected]>
AuthorDate: Wed Dec 10 19:36:32 2025 +0800
Support system table current_queries and queries_costs_histogram (#16890)
---
.../it/env/cluster/config/MppDataNodeConfig.java | 6 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 5 +
.../apache/iotdb/itbase/env/DataNodeConfig.java | 2 +
.../informationschema/IoTDBCurrentQueriesIT.java | 205 ++++++++++++++
.../relational/it/schema/IoTDBDatabaseIT.java | 6 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +
.../protocol/rest/v2/impl/RestApiServiceImpl.java | 16 +-
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 16 +-
.../iotdb/db/queryengine/common/QueryId.java | 7 +
.../InformationSchemaContentSupplierFactory.java | 146 +++++++++-
.../iotdb/db/queryengine/plan/Coordinator.java | 301 ++++++++++++++++++++-
.../plan/execution/IQueryExecution.java | 2 +
.../queryengine/plan/execution/QueryExecution.java | 5 +
.../plan/execution/config/ConfigExecution.java | 4 +
.../DataNodeLocationSupplierFactory.java | 2 +
.../operator/MergeTreeSortOperatorTest.java | 5 +
.../plan/relational/planner/PlanTester.java | 2 +
.../informationschema/CurrentQueriesTest.java | 107 ++++++++
.../informationschema}/ShowQueriesTest.java | 8 +-
.../conf/iotdb-system.properties.template | 6 +
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../schema/column/ColumnHeaderConstant.java | 10 +-
.../commons/schema/table/InformationSchema.java | 41 ++-
24 files changed, 895 insertions(+), 24 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 357c15c7856..5e418072a7d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -137,4 +137,10 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
setProperty("datanode_memory_proportion", dataNodeMemoryProportion);
return this;
}
+
+ @Override
+ public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) {
+ setProperty("query_cost_stat_window", String.valueOf(queryCostStatWindow));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 1af7cb8f613..bba4c964f95 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -93,4 +93,9 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
public DataNodeConfig setDataNodeMemoryProportion(String
dataNodeMemoryProportion) {
return this;
}
+
+ @Override
+ public DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index 0ae46ffc70f..d57015b1396 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -51,4 +51,6 @@ public interface DataNodeConfig {
DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
DataNodeConfig setDataNodeMemoryProportion(String dataNodeMemoryProportion);
+
+ DataNodeConfig setQueryCostStatWindow(int queryCostStatWindow);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java
new file mode 100644
index 00000000000..66941ec784f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/informationschema/IoTDBCurrentQueriesIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent.informationschema;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.queryengine.execution.QueryState;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.END_TIME_TABLE_MODEL;
+import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.NUMS;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.table.InformationSchema.getSchemaTables;
+import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
+import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class})
+// This IT will run at least 60s, so we only run it in 1C1D
+public class IoTDBCurrentQueriesIT {
+ private static final int CURRENT_QUERIES_COLUMN_NUM =
+ getSchemaTables().get("current_queries").getColumnNum();
+ private static final int QUERIES_COSTS_HISTOGRAM_COLUMN_NUM =
+ getSchemaTables().get("queries_costs_histogram").getColumnNum();
+ private static final String ADMIN_NAME =
+ CommonDescriptor.getInstance().getConfig().getDefaultAdminName();
+ private static final String ADMIN_PWD =
+ CommonDescriptor.getInstance().getConfig().getAdminPassword();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getDataNodeConfig().setQueryCostStatWindow(1);
+ EnvFactory.getEnv().initClusterEnvironment();
+ createUser("test", "test123123456");
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testCurrentQueries() {
+ try {
+ Connection connection =
+ EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD,
BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement();
+ statement.execute("USE information_schema");
+
+ // 1. query current_queries table
+ String sql = "SELECT * FROM current_queries";
+ ResultSet resultSet = statement.executeQuery(sql);
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM,
metaData.getColumnCount());
+ int rowNum = 0;
+ while (resultSet.next()) {
+ Assert.assertEquals(QueryState.RUNNING.name(),
resultSet.getString(STATE_TABLE_MODEL));
+ Assert.assertEquals(null, resultSet.getString(END_TIME_TABLE_MODEL));
+ Assert.assertEquals(sql, resultSet.getString(STATEMENT_TABLE_MODEL));
+ Assert.assertEquals(ADMIN_NAME, resultSet.getString(USER_TABLE_MODEL));
+ rowNum++;
+ }
+ Assert.assertEquals(1, rowNum);
+ resultSet.close();
+
+ // 2. query queries_costs_histogram table
+ sql = "SELECT * FROM queries_costs_histogram";
+ resultSet = statement.executeQuery(sql);
+ metaData = resultSet.getMetaData();
+ Assert.assertEquals(QUERIES_COSTS_HISTOGRAM_COLUMN_NUM,
metaData.getColumnCount());
+ rowNum = 0;
+ int queriesCount = 0;
+ while (resultSet.next()) {
+ int nums = resultSet.getInt(NUMS);
+ if (nums > 0) {
+ queriesCount++;
+ }
+ rowNum++;
+ }
+ Assert.assertEquals(1, queriesCount);
+ Assert.assertEquals(61, rowNum);
+
+ // 3. requery current_queries table
+ sql = "SELECT * FROM current_queries";
+ resultSet = statement.executeQuery(sql);
+ metaData = resultSet.getMetaData();
+ Assert.assertEquals(CURRENT_QUERIES_COLUMN_NUM,
metaData.getColumnCount());
+ rowNum = 0;
+ int finishedQueries = 0;
+ while (resultSet.next()) {
+ if
(QueryState.FINISHED.name().equals(resultSet.getString(STATE_TABLE_MODEL))) {
+ finishedQueries++;
+ }
+ rowNum++;
+ }
+ // three rows in the result, 2 FINISHED and 1 RUNNING
+ Assert.assertEquals(3, rowNum);
+ Assert.assertEquals(2, finishedQueries);
+ resultSet.close();
+
+ // 4. test the expired QueryInfo was evicted
+ Thread.sleep(61_001);
+ resultSet = statement.executeQuery(sql);
+ rowNum = 0;
+ while (resultSet.next()) {
+ rowNum++;
+ }
+ // one row in the result, current query
+ Assert.assertEquals(1, rowNum);
+ resultSet.close();
+
+ sql = "SELECT * FROM queries_costs_histogram";
+ resultSet = statement.executeQuery(sql);
+ queriesCount = 0;
+ while (resultSet.next()) {
+ int nums = resultSet.getInt(NUMS);
+ if (nums > 0) {
+ queriesCount++;
+ }
+ }
+ // the last current_queries table query was recorded, others are evicted
+ Assert.assertEquals(1, queriesCount);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ // 5. test privilege
+ testPrivilege();
+ }
+
+ private void testPrivilege() {
+ // 1. test current_queries table
+ try (Connection connection =
+ EnvFactory.getEnv().getConnection("test", "test123123456",
TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ String sql = "SELECT * FROM information_schema.current_queries";
+
+ // another user executes a query
+ try (Connection connection2 =
+ EnvFactory.getEnv().getConnection(ADMIN_NAME, ADMIN_PWD,
BaseEnv.TABLE_SQL_DIALECT)) {
+ ResultSet resultSet = connection2.createStatement().executeQuery(sql);
+ resultSet.close();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ // current user query current_queries table
+ ResultSet resultSet = statement.executeQuery(sql);
+ int rowNum = 0;
+ while (resultSet.next()) {
+ rowNum++;
+ }
+ // only current query in the result
+ Assert.assertEquals(1, rowNum);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // 2. test queries_costs_histogram table
+ try (Connection connection =
+ EnvFactory.getEnv().getConnection("test", "test123123456",
TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.executeQuery("SELECT * FROM
information_schema.queries_costs_histogram");
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "803: Access Denied: No permissions for this operation, please add
privilege SYSTEM",
+ e.getMessage());
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index e7bab16ad1f..4736d9b0521 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -399,6 +399,7 @@ public class IoTDBDatabaseIT {
"config_nodes,INF,",
"configurations,INF,",
"connections,INF,",
+ "current_queries,INF,",
"data_nodes,INF,",
"databases,INF,",
"functions,INF,",
@@ -407,6 +408,7 @@ public class IoTDBDatabaseIT {
"pipe_plugins,INF,",
"pipes,INF,",
"queries,INF,",
+ "queries_costs_histogram,INF,",
"regions,INF,",
"subscriptions,INF,",
"tables,INF,",
@@ -634,12 +636,14 @@ public class IoTDBDatabaseIT {
"information_schema,config_nodes,INF,USING,null,SYSTEM
VIEW,",
"information_schema,data_nodes,INF,USING,null,SYSTEM VIEW,",
"information_schema,connections,INF,USING,null,SYSTEM VIEW,",
+ "information_schema,current_queries,INF,USING,null,SYSTEM
VIEW,",
+
"information_schema,queries_costs_histogram,INF,USING,null,SYSTEM VIEW,",
"test,test,INF,USING,test,BASE TABLE,",
"test,view_table,100,USING,null,VIEW FROM TREE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("count devices from tables where status =
'USING'"),
"count(devices),",
- Collections.singleton("19,"));
+ Collections.singleton("21,"));
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from columns where table_name = 'queries' or database
= 'test'"),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index edb1d1c2674..9f67ab268c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -814,6 +814,9 @@ public class IoTDBConfig {
/** time cost(ms) threshold for slow query. Unit: millisecond */
private long slowQueryThreshold = 10000;
+ /** time window threshold for record of history queries. Unit: minute */
+ private int queryCostStatWindow = 0;
+
private int patternMatchingThreshold = 1000000;
/**
@@ -2627,6 +2630,14 @@ public class IoTDBConfig {
this.slowQueryThreshold = slowQueryThreshold;
}
+ public int getQueryCostStatWindow() {
+ return queryCostStatWindow;
+ }
+
+ public void setQueryCostStatWindow(int queryCostStatWindow) {
+ this.queryCostStatWindow = queryCostStatWindow;
+ }
+
public boolean isEnableIndex() {
return enableIndex;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d32b0b51f57..cfdb4c141b4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -813,6 +813,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"slow_query_threshold",
String.valueOf(conf.getSlowQueryThreshold()))));
+ conf.setQueryCostStatWindow(
+ Integer.parseInt(
+ properties.getProperty(
+ "query_cost_stat_window",
String.valueOf(conf.getQueryCostStatWindow()))));
+
conf.setDataRegionNum(
Integer.parseInt(
properties.getProperty("data_region_num",
String.valueOf(conf.getDataRegionNum()))));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 66be144edef..b657523987d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -212,7 +212,8 @@ public class RestApiServiceImpl extends RestApiService {
t = e;
return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
} finally {
- long costTime = System.nanoTime() - startTime;
+ long endTime = System.nanoTime();
+ long costTime = endTime - startTime;
StatementType statementType =
Optional.ofNullable(statement)
@@ -227,7 +228,18 @@ public class RestApiServiceImpl extends RestApiService {
if (queryId != null) {
COORDINATOR.cleanupQueryExecution(queryId);
} else {
- recordQueries(() -> costTime, new
FastLastQueryContentSupplier(prefixPathList), t);
+ IClientSession clientSession = SESSION_MANAGER.getCurrSession();
+
+ Supplier<String> contentOfQuerySupplier = new
FastLastQueryContentSupplier(prefixPathList);
+ COORDINATOR.recordCurrentQueries(
+ null,
+ startTime / 1_000_000,
+ endTime / 1_000_000,
+ costTime,
+ contentOfQuerySupplier,
+ clientSession.getUsername(),
+ clientSession.getClientAddress());
+ recordQueries(() -> costTime, contentOfQuerySupplier, t);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 3d4222629d6..cb5347e51f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -1050,13 +1050,23 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
resp.setMoreData(false);
- long costTime = System.nanoTime() - startTime;
+ long endTime = System.nanoTime();
+ long costTime = endTime - startTime;
CommonUtils.addStatementExecutionLatency(
OperationType.EXECUTE_QUERY_STATEMENT,
StatementType.FAST_LAST_QUERY.name(), costTime);
CommonUtils.addQueryLatency(StatementType.FAST_LAST_QUERY, costTime);
- recordQueries(
- () -> costTime, () -> String.format("thrift fastLastQuery %s",
prefixPath), null);
+
+ String statement = String.format("thrift fastLastQuery %s", prefixPath);
+ COORDINATOR.recordCurrentQueries(
+ null,
+ startTime / 1_000_000,
+ endTime / 1_000_000,
+ costTime,
+ () -> statement,
+ clientSession.getUsername(),
+ clientSession.getClientAddress());
+ recordQueries(() -> costTime, () -> statement, null);
return resp;
} catch (final Exception e) {
return RpcUtils.getTSExecuteStatementResp(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
index a59ce8334bf..44e67aa7ba6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.common;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -37,6 +38,8 @@ public class QueryId {
public static final QueryId MOCK_QUERY_ID = QueryId.valueOf("mock_query_id");
+ private static final int DATANODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+
private final String id;
private int nextPlanNodeIndex;
@@ -67,6 +70,10 @@ public class QueryId {
return id;
}
+ public static int getDataNodeId() {
+ return DATANODE_ID;
+ }
+
@Override
public String toString() {
return id;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index daceffce6b7..76d84c741de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask;
@@ -166,6 +167,10 @@ public class InformationSchemaContentSupplierFactory {
return new DataNodesSupplier(dataTypes, userEntity);
case InformationSchema.CONNECTIONS:
return new ConnectionsSupplier(dataTypes, userEntity);
+ case InformationSchema.CURRENT_QUERIES:
+ return new CurrentQueriesSupplier(dataTypes, userEntity);
+ case InformationSchema.QUERIES_COSTS_HISTOGRAM:
+ return new QueriesCostsHistogramSupplier(dataTypes, userEntity);
default:
throw new UnsupportedOperationException("Unknown table: " +
tableName);
}
@@ -201,14 +206,11 @@ public class InformationSchemaContentSupplierFactory {
final IQueryExecution queryExecution =
queryExecutions.get(nextConsumedIndex);
if
(queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
- final String[] splits = queryExecution.getQueryId().split("_");
- final int dataNodeId = Integer.parseInt(splits[splits.length - 1]);
-
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId()));
columnBuilders[1].writeLong(
TimestampPrecisionUtils.convertToCurrPrecision(
queryExecution.getStartExecutionTime(),
TimeUnit.MILLISECONDS));
- columnBuilders[2].writeInt(dataNodeId);
+ columnBuilders[2].writeInt(QueryId.getDataNodeId());
columnBuilders[3].writeFloat(
(float) (currTime - queryExecution.getStartExecutionTime()) /
1000);
columnBuilders[4].writeBinary(
@@ -1181,4 +1183,140 @@ public class InformationSchemaContentSupplierFactory {
return sessionConnectionIterator.hasNext();
}
}
+
+ private static class CurrentQueriesSupplier extends TsBlockSupplier {
+ private int nextConsumedIndex;
+ private List<Coordinator.StatedQueriesInfo> queriesInfo;
+
+ private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final
UserEntity userEntity) {
+ super(dataTypes);
+ queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
+ try {
+ accessControl.checkUserGlobalSysPrivilege(userEntity);
+ } catch (final AccessDeniedException e) {
+ queriesInfo =
+ queriesInfo.stream()
+ .filter(iQueryInfo ->
userEntity.getUsername().equals(iQueryInfo.getUser()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ protected void constructLine() {
+ final Coordinator.StatedQueriesInfo queryInfo =
queriesInfo.get(nextConsumedIndex);
+
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
+
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
+ columnBuilders[2].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
+ if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
+ columnBuilders[3].appendNull();
+ } else {
+ columnBuilders[3].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
+ }
+ columnBuilders[4].writeInt(QueryId.getDataNodeId());
+ columnBuilders[5].writeFloat(queryInfo.getCostTime());
+
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
+ columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
+
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
+ resultBuilder.declarePosition();
+ nextConsumedIndex++;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextConsumedIndex < queriesInfo.size();
+ }
+ }
+
+ private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
+ private int nextConsumedIndex;
+ private static final Binary[] BUCKETS =
+ new Binary[] {
+ BytesUtils.valueOf("[0,1)"),
+ BytesUtils.valueOf("[1,2)"),
+ BytesUtils.valueOf("[2,3)"),
+ BytesUtils.valueOf("[3,4)"),
+ BytesUtils.valueOf("[4,5)"),
+ BytesUtils.valueOf("[5,6)"),
+ BytesUtils.valueOf("[6,7)"),
+ BytesUtils.valueOf("[7,8)"),
+ BytesUtils.valueOf("[8,9)"),
+ BytesUtils.valueOf("[9,10)"),
+ BytesUtils.valueOf("[10,11)"),
+ BytesUtils.valueOf("[11,12)"),
+ BytesUtils.valueOf("[12,13)"),
+ BytesUtils.valueOf("[13,14)"),
+ BytesUtils.valueOf("[14,15)"),
+ BytesUtils.valueOf("[15,16)"),
+ BytesUtils.valueOf("[16,17)"),
+ BytesUtils.valueOf("[17,18)"),
+ BytesUtils.valueOf("[18,19)"),
+ BytesUtils.valueOf("[19,20)"),
+ BytesUtils.valueOf("[20,21)"),
+ BytesUtils.valueOf("[21,22)"),
+ BytesUtils.valueOf("[22,23)"),
+ BytesUtils.valueOf("[23,24)"),
+ BytesUtils.valueOf("[24,25)"),
+ BytesUtils.valueOf("[25,26)"),
+ BytesUtils.valueOf("[26,27)"),
+ BytesUtils.valueOf("[27,28)"),
+ BytesUtils.valueOf("[28,29)"),
+ BytesUtils.valueOf("[29,30)"),
+ BytesUtils.valueOf("[30,31)"),
+ BytesUtils.valueOf("[31,32)"),
+ BytesUtils.valueOf("[32,33)"),
+ BytesUtils.valueOf("[33,34)"),
+ BytesUtils.valueOf("[34,35)"),
+ BytesUtils.valueOf("[35,36)"),
+ BytesUtils.valueOf("[36,37)"),
+ BytesUtils.valueOf("[37,38)"),
+ BytesUtils.valueOf("[38,39)"),
+ BytesUtils.valueOf("[39,40)"),
+ BytesUtils.valueOf("[40,41)"),
+ BytesUtils.valueOf("[41,42)"),
+ BytesUtils.valueOf("[42,43)"),
+ BytesUtils.valueOf("[43,44)"),
+ BytesUtils.valueOf("[44,45)"),
+ BytesUtils.valueOf("[45,46)"),
+ BytesUtils.valueOf("[46,47)"),
+ BytesUtils.valueOf("[47,48)"),
+ BytesUtils.valueOf("[48,49)"),
+ BytesUtils.valueOf("[49,50)"),
+ BytesUtils.valueOf("[50,51)"),
+ BytesUtils.valueOf("[51,52)"),
+ BytesUtils.valueOf("[52,53)"),
+ BytesUtils.valueOf("[53,54)"),
+ BytesUtils.valueOf("[54,55)"),
+ BytesUtils.valueOf("[55,56)"),
+ BytesUtils.valueOf("[56,57)"),
+ BytesUtils.valueOf("[57,58)"),
+ BytesUtils.valueOf("[58,59)"),
+ BytesUtils.valueOf("[59,60)"),
+ BytesUtils.valueOf("60+")
+ };
+ private final int[] currentQueriesCostHistogram;
+
+ private QueriesCostsHistogramSupplier(
+ final List<TSDataType> dataTypes, final UserEntity userEntity) {
+ super(dataTypes);
+ accessControl.checkUserGlobalSysPrivilege(userEntity);
+ currentQueriesCostHistogram =
Coordinator.getInstance().getCurrentQueriesCostHistogram();
+ }
+
+ @Override
+ protected void constructLine() {
+ columnBuilders[0].writeBinary(BUCKETS[nextConsumedIndex]);
+
columnBuilders[1].writeInt(currentQueriesCostHistogram[nextConsumedIndex]);
+ resultBuilder.declarePosition();
+ nextConsumedIndex++;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextConsumedIndex < 61;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 7708f6c18cd..78cdee720da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -42,6 +43,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator;
+import org.apache.iotdb.db.queryengine.execution.QueryState;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -141,23 +143,37 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.thrift.TBase;
+import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static
org.apache.iotdb.db.queryengine.plan.Coordinator.QueryInfo.DEFAULT_END_TIME;
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
+import static org.apache.tsfile.utils.RamUsageEstimator.shallowSizeOfInstance;
+import static org.apache.tsfile.utils.RamUsageEstimator.sizeOfCharArray;
/**
* The coordinator for MPP. It manages all the queries which are executed in
current Node. And it
@@ -203,12 +219,32 @@ public class Coordinator {
private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap;
+ private final BlockingDeque<QueryInfo> currentQueriesInfo = new
LinkedBlockingDeque<>();
+ private final AtomicInteger[] currentQueriesCostHistogram = new
AtomicInteger[61];
+ private final ScheduledExecutorService retryFailTasksExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.EXPIRED_QUERIES_INFO_CLEAR.getName());
+
private final StatementRewrite statementRewrite;
private final List<PlanOptimizer> logicalPlanOptimizers;
private final List<PlanOptimizer> distributionPlanOptimizers;
private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier
dataNodeLocationSupplier;
private final TypeManager typeManager;
+ {
+ for (int i = 0; i < 61; i++) {
+ currentQueriesCostHistogram[i] = new AtomicInteger();
+ }
+
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ retryFailTasksExecutor,
+ this::clearExpiredQueriesInfoTask,
+ 1_000L,
+ 1_000L,
+ TimeUnit.MILLISECONDS);
+ LOGGER.info("Expired-Queries-Info-Clear thread is successfully started.");
+ }
+
static {
coordinatorMemoryBlock =
IoTDBDescriptor.getInstance()
@@ -625,12 +661,22 @@ public class Coordinator {
try (SetThreadName threadName = new
SetThreadName(queryExecution.getQueryId())) {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup(t);
+ boolean isUserQuery = queryExecution.isQuery() &&
queryExecution.isUserQuery();
+ Supplier<String> contentOfQuerySupplier =
+ new ContentOfQuerySupplier(nativeApiRequest, queryExecution);
+ if (isUserQuery) {
+ recordCurrentQueries(
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ System.currentTimeMillis(),
+ queryExecution.getTotalExecutionTime(),
+ contentOfQuerySupplier,
+ queryExecution.getUser(),
+ queryExecution.getClientHostname());
+ }
queryExecutionMap.remove(queryId);
- if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
- recordQueries(
- queryExecution::getTotalExecutionTime,
- new ContentOfQuerySupplier(nativeApiRequest, queryExecution),
- t);
+ if (isUserQuery) {
+ recordQueries(queryExecution::getTotalExecutionTime,
contentOfQuerySupplier, t);
}
}
}
@@ -722,4 +768,249 @@ public class Coordinator {
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ Supplier<String> contentOfQuerySupplier,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(
+ queryId,
+ startTime,
+ endTime,
+ costTimeInSeconds,
+ contentOfQuerySupplier.get(),
+ user,
+ clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000L;
+ // peek head, the head QueryInfo is in the time window, return directly
+ QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+ if (queryInfo == null || queryInfo.endTime >= expiredTime) {
+ return;
+ }
+
+ queryInfo = currentQueriesInfo.poll();
+ while (queryInfo != null) {
+ if (queryInfo.endTime < expiredTime) {
+ // out of time window, clear queryInfo
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+ unrecordInHistogram(queryInfo.costTime);
+ queryInfo = currentQueriesInfo.poll();
+ } else {
+ // the head of the queue is not expired, add back
+ currentQueriesInfo.addFirst(queryInfo);
+ // there is no more candidate to clear
+ return;
+ }
+ }
+ }
+
+ public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+ List<IQueryExecution> runningQueries = getAllQueryExecutions();
+ Set<String> runningQueryIdSet =
+
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+ List<StatedQueriesInfo> result = new ArrayList<>();
+
+ // add History queries (satisfy the time window) info
+ Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+ Set<String> repetitionQueryIdSet = new HashSet<>();
+ long currentTime = System.currentTimeMillis();
+ long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 *
1_000L;
+ while (historyQueriesIterator.hasNext()) {
+ QueryInfo queryInfo = historyQueriesIterator.next();
+ if (queryInfo.endTime < needRecordTime) {
+ // out of time window, ignore it
+ } else {
+ if (runningQueryIdSet.contains(queryInfo.queryId)) {
+ repetitionQueryIdSet.add(queryInfo.queryId);
+ }
+ result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
+ }
+ }
+
+ // add Running queries info after remove the repetitions which has
recorded in History queries
+ result.addAll(
+ runningQueries.stream()
+ .filter(queryExecution ->
!repetitionQueryIdSet.contains(queryExecution.getQueryId()))
+ .map(
+ queryExecution ->
+ new StatedQueriesInfo(
+ QueryState.RUNNING,
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ DEFAULT_END_TIME,
+ (currentTime - queryExecution.getStartExecutionTime())
/ 1000,
+ queryExecution.getExecuteSQL().orElse("UNKNOWN"),
+ queryExecution.getUser(),
+ queryExecution.getClientHostname()))
+ .collect(Collectors.toList()));
+ return result;
+ }
+
+ public int[] getCurrentQueriesCostHistogram() {
+ return
Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray();
+ }
+
+ public static class QueryInfo implements Accountable {
+ public static final long DEFAULT_END_TIME = -1L;
+ private static final long INSTANCE_SIZE =
shallowSizeOfInstance(QueryInfo.class);
+
+ private final String queryId;
+
+ // unit: millisecond
+ private final long startTime;
+ private final long endTime;
+ // unit: second
+ private final float costTime;
+
+ private final String statement;
+ private final String user;
+ private final String clientHost;
+
+ public QueryInfo(
+ String queryId,
+ long startTime,
+ long endTime,
+ float costTime,
+ String statement,
+ String user,
+ String clientHost) {
+ this.queryId = queryId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.costTime = costTime;
+ this.statement = statement;
+ this.user = user;
+ this.clientHost = clientHost;
+ }
+
+ public String getClientHost() {
+ return clientHost;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getEndTime() {
+ return endTime;
+ }
+
+ public float getCostTime() {
+ return costTime;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getStatement() {
+ return statement;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ + sizeOfCharArray(statement.length())
+ + sizeOfCharArray(user.length())
+ + sizeOfCharArray(clientHost.length());
+ }
+ }
+
+ public static class StatedQueriesInfo extends QueryInfo {
+ private final QueryState queryState;
+
+ private StatedQueriesInfo(QueryState queryState, QueryInfo queryInfo) {
+ super(
+ queryInfo.queryId,
+ queryInfo.startTime,
+ queryInfo.endTime,
+ queryInfo.costTime,
+ queryInfo.statement,
+ queryInfo.user,
+ queryInfo.clientHost);
+ this.queryState = queryState;
+ }
+
+ private StatedQueriesInfo(
+ QueryState queryState,
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTime,
+ String statement,
+ String user,
+ String clientHost) {
+ super(queryId, startTime, endTime, costTime, statement, user,
clientHost);
+ this.queryState = queryState;
+ }
+
+ public String getQueryState() {
+ return queryState.name();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index 98257c24293..e98f016767f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -77,4 +77,6 @@ public interface IQueryExecution {
IClientSession.SqlDialect getSQLDialect();
String getUser();
+
+ String getClientHostname();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 2c1657e839e..4734db5850a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -700,6 +700,11 @@ public class QueryExecution implements IQueryExecution {
return context.getSession().getUserName();
}
+ @Override
+ public String getClientHostname() {
+ return context.getCliHostname();
+ }
+
public MPPQueryContext getContext() {
return context;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index 1880924f297..823a620820f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -353,4 +353,8 @@ public class ConfigExecution implements IQueryExecution {
public String getUser() {
return context.getSession().getUserName();
}
+
+ public String getClientHostname() {
+ return context.getCliHostname();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
index f8cf497546e..d7d755ddc1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
@@ -86,6 +86,8 @@ public class DataNodeLocationSupplierFactory {
switch (tableName) {
case InformationSchema.QUERIES:
case InformationSchema.CONNECTIONS:
+ case InformationSchema.CURRENT_QUERIES:
+ case InformationSchema.QUERIES_COSTS_HISTOGRAM:
return getReadableDataNodeLocations();
case InformationSchema.DATABASES:
case InformationSchema.TABLES:
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index 4ca38a5c8d9..8fc7d01437c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -1910,5 +1910,10 @@ public class MergeTreeSortOperatorTest {
public boolean isUserQuery() {
return false;
}
+
+ @Override
+ public String getClientHostname() {
+ return SessionConfig.DEFAULT_HOST;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java
index a4cef177d81..85075570201 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/PlanTester.java
@@ -83,6 +83,8 @@ public class PlanTester {
public List<TDataNodeLocation> getDataNodeLocations(String table) {
switch (table) {
case "queries":
+ case "current_queries":
+ case "queries_costs_histogram":
return ImmutableList.of(
genDataNodeLocation(1, "192.0.1.1"), genDataNodeLocation(2,
"192.0.1.2"));
default:
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java
new file mode 100644
index 00000000000..1e3163321ec
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/CurrentQueriesTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package
org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.util.Optional;
+
+import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.BIN;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.CLIENT_IP;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COST_TIME;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.END_TIME_TABLE_MODEL;
+import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.NUMS;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.QUERY_ID_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.START_TIME_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATE_TABLE_MODEL;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.infoSchemaTableScan;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output;
+
+public class CurrentQueriesTest {
+ private final PlanTester planTester = new PlanTester();
+
+ @Test
+ public void testCurrentQueries() {
+ LogicalQueryPlan logicalQueryPlan =
+ planTester.createPlan("select * from
information_schema.current_queries");
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ infoSchemaTableScan(
+ "information_schema.current_queries",
+ Optional.empty(),
+ ImmutableList.of(
+ QUERY_ID_TABLE_MODEL,
+ STATE_TABLE_MODEL,
+ START_TIME_TABLE_MODEL,
+ END_TIME_TABLE_MODEL,
+ DATA_NODE_ID_TABLE_MODEL,
+ COST_TIME,
+ STATEMENT_TABLE_MODEL,
+ USER_TABLE_MODEL,
+ CLIENT_IP))));
+
+ // - Exchange
+ // Output - Collect - Exchange
+ assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(),
exchange())));
+ // TableScan
+ assertPlan(
+ planTester.getFragmentPlan(1),
+ infoSchemaTableScan("information_schema.current_queries",
Optional.of(1)));
+ // TableScan
+ assertPlan(
+ planTester.getFragmentPlan(2),
+ infoSchemaTableScan("information_schema.current_queries",
Optional.of(2)));
+ }
+
+ @Test
+ public void testQueriesCostsHistogram() {
+ LogicalQueryPlan logicalQueryPlan =
+ planTester.createPlan("select * from
information_schema.queries_costs_histogram");
+ assertPlan(
+ logicalQueryPlan,
+ output(
+ infoSchemaTableScan(
+ "information_schema.queries_costs_histogram",
+ Optional.empty(),
+ ImmutableList.of(BIN, NUMS))));
+
+ // - Exchange
+ // Output - Collect - Exchange
+ assertPlan(planTester.getFragmentPlan(0), output(collect(exchange(),
exchange())));
+ // TableScan
+ assertPlan(
+ planTester.getFragmentPlan(1),
+ infoSchemaTableScan("information_schema.queries_costs_histogram",
Optional.of(1)));
+ // TableScan
+ assertPlan(
+ planTester.getFragmentPlan(2),
+ infoSchemaTableScan("information_schema.queries_costs_histogram",
Optional.of(2)));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java
similarity index 94%
rename from
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java
rename to
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java
index 63b7b783d74..7161c68f4e9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/informationschema/ShowQueriesTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+package
org.apache.iotdb.db.queryengine.plan.relational.planner.informationschema;
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.queryengine.plan.relational.planner.PlanTester;
@@ -32,7 +32,9 @@ import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.ELAPSE
import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.QUERY_ID_TABLE_MODEL;
import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.START_TIME_TABLE_MODEL;
import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT_TABLE_MODEL;
import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER;
+import static
org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER_TABLE_MODEL;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange;
@@ -61,8 +63,8 @@ public class ShowQueriesTest {
START_TIME_TABLE_MODEL,
DATA_NODE_ID_TABLE_MODEL,
ELAPSED_TIME_TABLE_MODEL,
- STATEMENT.toLowerCase(Locale.ENGLISH),
- USER.toLowerCase(Locale.ENGLISH)))));
+ STATEMENT_TABLE_MODEL,
+ USER_TABLE_MODEL))));
// - Exchange
// Output - Collect - Exchange
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 4b31bf9a286..6bbc4124a9f 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1080,6 +1080,12 @@ max_tsblock_line_number=1000
# Datatype: long
slow_query_threshold=10000
+# Time window threshold(min) for record of history queries.
+# effectiveMode: hot_reload
+# Datatype: int
+# Privilege: SYSTEM
+query_cost_stat_window=0
+
# The max executing time of query. unit: ms
# effectiveMode: restart
# Datatype: int
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 390e9f80e9b..6f9f95ca8fe 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -35,6 +35,7 @@ public enum ThreadName {
FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"),
FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"),
DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"),
+ EXPIRED_QUERIES_INFO_CLEAR("Expired-Queries-Info-Clear"),
// -------------------------- MPP --------------------------
MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"),
MPP_DATA_EXCHANGE_TASK_EXECUTOR("MPP-Data-Exchange-Task-Executors"),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index 67850d991cb..0459d4d2c86 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -214,11 +214,19 @@ public class ColumnHeaderConstant {
public static final String CLIENT_IP = "client_ip";
public static final String QUERY_ID_TABLE_MODEL = "query_id";
- public static final String QUERY_ID_START_TIME_TABLE_MODEL = "start_time";
public static final String DATA_NODE_ID_TABLE_MODEL = "datanode_id";
public static final String START_TIME_TABLE_MODEL = "start_time";
public static final String ELAPSED_TIME_TABLE_MODEL = "elapsed_time";
+ // column names for current_queries and queries_costs_histogram
+ public static final String STATE_TABLE_MODEL = "state";
+ public static final String END_TIME_TABLE_MODEL = "end_time";
+ public static final String COST_TIME = "cost_time";
+ public static final String STATEMENT_TABLE_MODEL = "statement";
+ public static final String USER_TABLE_MODEL = "user";
+ public static final String BIN = "bin";
+ public static final String NUMS = "nums";
+
public static final String TABLE_NAME_TABLE_MODEL = "table_name";
public static final String TABLE_TYPE_TABLE_MODEL = "table_type";
public static final String COLUMN_NAME_TABLE_MODEL = "column_name";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
index 243bc41c40c..b8e03423d61 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
@@ -50,6 +50,8 @@ public class InformationSchema {
public static final String CONFIG_NODES = "config_nodes";
public static final String DATA_NODES = "data_nodes";
public static final String CONNECTIONS = "connections";
+ public static final String CURRENT_QUERIES = "current_queries";
+ public static final String QUERIES_COSTS_HISTOGRAM =
"queries_costs_histogram";
static {
final TsTable queriesTable = new TsTable(QUERIES);
@@ -57,17 +59,15 @@ public class InformationSchema {
new TagColumnSchema(ColumnHeaderConstant.QUERY_ID_TABLE_MODEL,
TSDataType.STRING));
queriesTable.addColumnSchema(
new AttributeColumnSchema(
- ColumnHeaderConstant.QUERY_ID_START_TIME_TABLE_MODEL,
TSDataType.TIMESTAMP));
+ ColumnHeaderConstant.START_TIME_TABLE_MODEL,
TSDataType.TIMESTAMP));
queriesTable.addColumnSchema(
new
AttributeColumnSchema(ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL,
TSDataType.INT32));
queriesTable.addColumnSchema(
new
AttributeColumnSchema(ColumnHeaderConstant.ELAPSED_TIME_TABLE_MODEL,
TSDataType.FLOAT));
queriesTable.addColumnSchema(
- new AttributeColumnSchema(
- ColumnHeaderConstant.STATEMENT.toLowerCase(Locale.ENGLISH),
TSDataType.STRING));
+ new AttributeColumnSchema(ColumnHeaderConstant.STATEMENT_TABLE_MODEL,
TSDataType.STRING));
queriesTable.addColumnSchema(
- new AttributeColumnSchema(
- ColumnHeaderConstant.USER.toLowerCase(Locale.ENGLISH),
TSDataType.STRING));
+ new AttributeColumnSchema(ColumnHeaderConstant.USER_TABLE_MODEL,
TSDataType.STRING));
queriesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
schemaTables.put(QUERIES, queriesTable);
@@ -361,6 +361,37 @@ public class InformationSchema {
new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP,
TSDataType.STRING));
connectionsTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
schemaTables.put(CONNECTIONS, connectionsTable);
+
+ final TsTable currentQueriesTable = new TsTable(CURRENT_QUERIES);
+ currentQueriesTable.addColumnSchema(
+ new TagColumnSchema(ColumnHeaderConstant.QUERY_ID_TABLE_MODEL,
TSDataType.STRING));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.STATE_TABLE_MODEL,
TSDataType.STRING));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(
+ ColumnHeaderConstant.START_TIME_TABLE_MODEL,
TSDataType.TIMESTAMP));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.END_TIME_TABLE_MODEL,
TSDataType.TIMESTAMP));
+ currentQueriesTable.addColumnSchema(
+ new
AttributeColumnSchema(ColumnHeaderConstant.DATA_NODE_ID_TABLE_MODEL,
TSDataType.INT32));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.COST_TIME,
TSDataType.FLOAT));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.STATEMENT_TABLE_MODEL,
TSDataType.STRING));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.USER_TABLE_MODEL,
TSDataType.STRING));
+ currentQueriesTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP,
TSDataType.STRING));
+ currentQueriesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
+ schemaTables.put(CURRENT_QUERIES, currentQueriesTable);
+
+ final TsTable queriesCostsHistogramTable = new
TsTable(QUERIES_COSTS_HISTOGRAM);
+ queriesCostsHistogramTable.addColumnSchema(
+ new TagColumnSchema(ColumnHeaderConstant.BIN, TSDataType.STRING));
+ queriesCostsHistogramTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.NUMS,
TSDataType.INT32));
+ queriesCostsHistogramTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
+ schemaTables.put(QUERIES_COSTS_HISTOGRAM, queriesCostsHistogramTable);
}
public static Map<String, TsTable> getSchemaTables() {