This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1746cdb28b1 Add system table named connections to resolve the idle
session can be found (#16846)
1746cdb28b1 is described below
commit 1746cdb28b135d122ae2138ac411af4c8914b600
Author: libo <[email protected]>
AuthorDate: Fri Dec 5 14:32:52 2025 +0800
Add system table named connections to resolve the idle session can be found
(#16846)
---
.../relational/it/schema/IoTDBDatabaseIT.java | 4 +-
.../iotdb/session/it/IoTDBConnectionsIT.java | 364 +++++++++++++++++++++
.../iotdb/db/protocol/session/IClientSession.java | 16 +
.../iotdb/db/protocol/session/SessionManager.java | 14 +
.../db/queryengine/common/ConnectionInfo.java | 64 ++++
.../InformationSchemaContentSupplierFactory.java | 39 +++
.../DataNodeLocationSupplierFactory.java | 1 +
.../schema/column/ColumnHeaderConstant.java | 8 +
.../commons/schema/table/InformationSchema.java | 17 +
9 files changed, 526 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
index 4987b6d4b0c..e04ff838819 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java
@@ -398,6 +398,7 @@ public class IoTDBDatabaseIT {
"columns,INF,",
"config_nodes,INF,",
"configurations,INF,",
+ "connections,INF,",
"data_nodes,INF,",
"databases,INF,",
"functions,INF,",
@@ -644,12 +645,13 @@ public class IoTDBDatabaseIT {
"information_schema,nodes,INF,USING,null,SYSTEM VIEW,",
"information_schema,config_nodes,INF,USING,null,SYSTEM
VIEW,",
"information_schema,data_nodes,INF,USING,null,SYSTEM VIEW,",
+ "information_schema,connections,INF,USING,null,SYSTEM VIEW,",
"test,test,INF,USING,test,BASE TABLE,",
"test,view_table,100,USING,null,VIEW FROM TREE,")));
TestUtils.assertResultSetEqual(
statement.executeQuery("count devices from tables where status =
'USING'"),
"count(devices),",
- Collections.singleton("19,"));
+ Collections.singleton("20,"));
TestUtils.assertResultSetEqual(
statement.executeQuery(
"select * from columns where table_name = 'queries' or database
= 'test'"),
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
new file mode 100644
index 00000000000..bc043c79e8a
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.it;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.itbase.env.BaseEnv;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.createUser;
+import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBConnectionsIT {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBConnectionsIT.class);
+ private static final String SHOW_DATANODES = "show datanodes";
+ private static final int COLUMN_AMOUNT = 6;
+ private static Set<Integer> allDataNodeId = new HashSet<>();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig();
+ EnvFactory.getEnv().initClusterEnvironment(1, 2);
+ createUser("test", "test123123456");
+ try (Connection connection = EnvFactory.getEnv().getTableConnection();
+ Statement statement = connection.createStatement()) {
+ // Get all data nodes
+ ResultSet result = statement.executeQuery(SHOW_DATANODES);
+ while (result.next()) {
+ allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID));
+ }
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ // Create two connections on the different datanode, validate normal test
case.
+ @Test
+ public void testDifferentDataNodeGetConnections() {
+ Connection conn = null;
+ int dataNodeId = (int) allDataNodeId.toArray()[0];
+ // Create the first connection on the datanode.
+ try {
+ Connection connection =
+ EnvFactory.getEnv()
+ .getConnection(
+ EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement();
+ statement.execute("USE information_schema");
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM
connections");
+ if (!resultSet.next()) {
+ fail();
+ }
+
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
+ while (resultSet.next()) {
+ LOGGER.info(
+ "{}, {}, {}, {}, {}, {}",
+ resultSet.getString(1),
+ resultSet.getString(2),
+ resultSet.getString(3),
+ resultSet.getString(4),
+ resultSet.getString(5),
+ resultSet.getString(6));
+ }
+
+ conn = connection;
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+
+ int anotherDataNodeId = (int) allDataNodeId.toArray()[1];
+ // Create the second connection on the datanode.
+ try (Connection connection1 =
+ EnvFactory.getEnv()
+ .getConnection(
+
EnvFactory.getEnv().dataNodeIdToWrapper(anotherDataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement1 = connection1.createStatement()) {
+ statement1.execute("USE information_schema");
+ ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM
connections");
+ if (!resultSet1.next()) {
+ fail();
+ }
+
+ while (resultSet1.next()) {
+ // Before close the first connection, the current record count must be
two.
+ Assert.assertEquals(2, resultSet1.getInt(1));
+ }
+
+ conn.close();
+
+ ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM
connections");
+ if (!resultSet2.next()) {
+ fail();
+ }
+
+ while (resultSet2.next()) {
+ // After close the first connection, the current record count change
into one.
+ Assert.assertEquals(1, resultSet2.getInt(1));
+ }
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+ }
+
+ // Create two connections on the same datanode, validate normal test case.
+ @Test
+ public void testSameDataNodeGetConnections() {
+ Connection conn = null;
+ int dataNodeId = (int) allDataNodeId.toArray()[0];
+ try (Connection connection =
+ EnvFactory.getEnv()
+ .getConnection(
+ EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("USE information_schema");
+
+ ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT * FROM connections WHERE data_node_id = '" + dataNodeId
+ "'");
+ if (!resultSet.next()) {
+ fail();
+ }
+
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
+ while (resultSet.next()) {
+ LOGGER.info(
+ "{}, {}, {}, {}, {}, {}",
+ resultSet.getString(1),
+ resultSet.getString(2),
+ resultSet.getString(3),
+ resultSet.getString(4),
+ resultSet.getTimestamp(5),
+ resultSet.getString(6));
+ }
+
+ conn = connection;
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+
+ // Create the second connection on the same datanode.
+ try (Connection connection1 =
+ EnvFactory.getEnv()
+ .getConnection(
+ EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement1 = connection1.createStatement()) {
+ statement1.execute("USE information_schema");
+ ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM
connections");
+ if (!resultSet1.next()) {
+ fail();
+ }
+
+ while (resultSet1.next()) {
+ // Before close the first connection, the current record count must be
two.
+ Assert.assertEquals(2, resultSet1.getInt(1));
+ }
+
+ conn.close();
+
+ ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM
connections");
+ if (!resultSet2.next()) {
+ fail();
+ }
+
+ while (resultSet2.next()) {
+ // After close the first connection, the current record count change
into one.
+ Assert.assertEquals(1, resultSet2.getInt(1));
+ }
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+ }
+
+ // Validate normal test case when close one datanode.
+ @Test
+ public void testClosedDataNodeGetConnections() throws Exception {
+ if (allDataNodeId.size() <= 1) {
+ return;
+ }
+ int closedDataNodeId = (int) allDataNodeId.toArray()[0];
+ try (Connection connection =
+ EnvFactory.getEnv()
+ .getConnection(
+
EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("USE information_schema");
+
+ ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT COUNT(*) FROM connections WHERE data_node_id = '" +
closedDataNodeId + "'");
+ if (!resultSet.next()) {
+ fail();
+ }
+ // All records corresponding the datanode exist Before close the
datanode. Validate result
+ // larger than zero.
+ Assert.assertTrue(resultSet.getInt(1) > 0);
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+
+ // close the number closedDataNodeId datanode
+ EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().stop();
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+
+ // Wait for shutdown check
+ while (true) {
+ AtomicBoolean containUnknown = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if
(NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+ containUnknown.set(true);
+ }
+ });
+
+ if (containUnknown.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ int activeDataNodeId = (int) allDataNodeId.toArray()[1];
+ try (Connection connection =
+ EnvFactory.getEnv()
+ .getConnection(
+
EnvFactory.getEnv().dataNodeIdToWrapper(activeDataNodeId).get(),
+
CommonDescriptor.getInstance().getConfig().getDefaultAdminName(),
+
CommonDescriptor.getInstance().getConfig().getAdminPassword(),
+ BaseEnv.TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("USE information_schema");
+
+ ResultSet resultSet =
+ statement.executeQuery(
+ "SELECT COUNT(*) FROM connections WHERE data_node_id = '" +
closedDataNodeId + "'");
+ if (!resultSet.next()) {
+ fail();
+ }
+ // All records corresponding the datanode will be cleared After close
the datanode. Validate
+ // result if it is zero.
+ Assert.assertEquals(0, resultSet.getLong(1));
+ } catch (Exception e) {
+ LOGGER.error("{}", e.getMessage(), e);
+ fail(e.getMessage());
+ }
+
+ // revert environment
+ EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().start();
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ // Wait for restart check
+ while (true) {
+ AtomicBoolean containUnknown = new AtomicBoolean(false);
+ TShowDataNodesResp showDataNodesResp = client.showDataNodes();
+ showDataNodesResp
+ .getDataNodesInfoList()
+ .forEach(
+ dataNodeInfo -> {
+ if
(NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) {
+ containUnknown.set(true);
+ }
+ });
+
+ if (!containUnknown.get()) {
+ break;
+ }
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+ }
+
+ @Test
+ public void testNoAuthUserGetConnections() {
+ try (Connection connection =
+ EnvFactory.getEnv().getConnection("test", "test123123456",
TABLE_SQL_DIALECT);
+ Statement statement = connection.createStatement()) {
+ statement.execute("USE information_schema");
+ ResultSet resultSet = statement.executeQuery("SELECT * FROM
connections");
+ if (!resultSet.next()) {
+ fail();
+ }
+ ResultSetMetaData metaData = resultSet.getMetaData();
+ Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount());
+ } catch (SQLException e) {
+ Assert.assertEquals(
+ "803: Access Denied: No permissions for this operation, please add
privilege SYSTEM",
+ e.getMessage());
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
index 21d4121ffd3..351806de099 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.protocol.session;
import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion;
+import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo;
import org.apache.iotdb.service.rpc.thrift.TSConnectionType;
@@ -56,6 +57,8 @@ public abstract class IClientSession {
@Nullable private String databaseName;
+ private long lastActiveTime;
+
public abstract String getClientAddress();
public abstract int getClientPort();
@@ -140,6 +143,11 @@ public abstract class IClientSession {
getUsername(), getLogInTime(), getConnectionId(), getConnectionType());
}
+ public ConnectionInfo convertToConnectionInfo() {
+ return new ConnectionInfo(
+ getUserId(), getUsername(), getId(), getLastActiveTime(),
getClientAddress());
+ }
+
/**
* statementIds that this client opens.<br>
* For JDBC clients, each Statement instance has a statement id.<br>
@@ -180,6 +188,14 @@ public abstract class IClientSession {
this.databaseName = databaseName;
}
+ public long getLastActiveTime() {
+ return lastActiveTime;
+ }
+
+ public void setLastActiveTime(long lastActiveTime) {
+ this.lastActiveTime = lastActiveTime;
+ }
+
public enum SqlDialect {
TREE((byte) 0),
TABLE((byte) 1);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index d098cf70223..a4a28efa5ec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.auth.LoginLockManager;
import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.protocol.thrift.OperationType;
+import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager;
import org.apache.iotdb.db.utils.DataNodeAuthUtils;
@@ -58,6 +59,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
@@ -386,6 +388,10 @@ public class SessionManager implements SessionManagerMBean
{
/** update connection idle time after execution. */
public void updateIdleTime() {
currSessionIdleTime.set(System.nanoTime());
+ IClientSession session = currSession.get();
+ if (session != null) {
+ session.setLastActiveTime(CommonDateTimeUtils.currentTime());
+ }
}
public TimeZone getSessionTimeZone() {
@@ -538,6 +544,14 @@ public class SessionManager implements SessionManagerMBean
{
.collect(Collectors.toList()));
}
+ public List<ConnectionInfo> getAllSessionConnectionInfo() {
+ return sessions.keySet().stream()
+ .filter(s -> StringUtils.isNotEmpty(s.getUsername()))
+ .map(IClientSession::convertToConnectionInfo)
+ .sorted(Comparator.comparingLong(ConnectionInfo::getLastActiveTime))
+ .collect(Collectors.toList());
+ }
+
private static class SessionManagerHelper {
private static final SessionManager INSTANCE = new SessionManager();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java
new file mode 100644
index 00000000000..3889e533187
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.common;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+public class ConnectionInfo {
+ private static final int dataNodeId =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+ private final long userId;
+ private final String userName;
+ private final long sessionId;
+ private final long lastActiveTime;
+ private final String clientAddress;
+
+ public ConnectionInfo(
+ long userId, String userName, long sessionId, long lastActiveTime,
String clientAddress) {
+ this.userId = userId;
+ this.userName = userName;
+ this.sessionId = sessionId;
+ this.lastActiveTime = lastActiveTime;
+ this.clientAddress = clientAddress;
+ }
+
+ public int getDataNodeId() {
+ return dataNodeId;
+ }
+
+ public long getUserId() {
+ return userId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public long getSessionId() {
+ return sessionId;
+ }
+
+ public long getLastActiveTime() {
+ return lastActiveTime;
+ }
+
+ public String getClientAddress() {
+ return clientAddress;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 1449b565c0e..fc688816565 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -71,6 +71,8 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.client.ainode.AINodeClient;
import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager;
import org.apache.iotdb.db.protocol.session.IClientSession;
+import org.apache.iotdb.db.protocol.session.SessionManager;
+import org.apache.iotdb.db.queryengine.common.ConnectionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask;
@@ -126,6 +128,9 @@ import static
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.Sho
import static
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_EXTERNAL;
public class InformationSchemaContentSupplierFactory {
+
+ private static final SessionManager sessionManager =
SessionManager.getInstance();
+
private InformationSchemaContentSupplierFactory() {}
public static Iterator<TsBlock> getSupplier(
@@ -166,6 +171,8 @@ public class InformationSchemaContentSupplierFactory {
return new ConfigNodesSupplier(dataTypes, userEntity);
case InformationSchema.DATA_NODES:
return new DataNodesSupplier(dataTypes, userEntity);
+ case InformationSchema.CONNECTIONS:
+ return new ConnectionsSupplier(dataTypes, userEntity);
default:
throw new UnsupportedOperationException("Unknown table: " +
tableName);
}
@@ -1255,4 +1262,36 @@ public class InformationSchemaContentSupplierFactory {
protected abstract void constructLine();
}
+
+ private static class ConnectionsSupplier extends TsBlockSupplier {
+ private Iterator<ConnectionInfo> sessionConnectionIterator;
+
+ private ConnectionsSupplier(final List<TSDataType> dataTypes, final
UserEntity userEntity) {
+ super(dataTypes);
+ accessControl.checkUserGlobalSysPrivilege(userEntity);
+ sessionConnectionIterator =
sessionManager.getAllSessionConnectionInfo().iterator();
+ }
+
+ @Override
+ protected void constructLine() {
+ ConnectionInfo connectionInfo = sessionConnectionIterator.next();
+ columnBuilders[0].writeBinary(
+ new Binary(String.valueOf(connectionInfo.getDataNodeId()),
TSFileConfig.STRING_CHARSET));
+ columnBuilders[1].writeBinary(
+ new Binary(String.valueOf(connectionInfo.getUserId()),
TSFileConfig.STRING_CHARSET));
+ columnBuilders[2].writeBinary(
+ new Binary(String.valueOf(connectionInfo.getSessionId()),
TSFileConfig.STRING_CHARSET));
+ columnBuilders[3].writeBinary(
+ new Binary(connectionInfo.getUserName(),
TSFileConfig.STRING_CHARSET));
+ columnBuilders[4].writeLong(connectionInfo.getLastActiveTime());
+ columnBuilders[5].writeBinary(
+ new Binary(connectionInfo.getClientAddress(),
TSFileConfig.STRING_CHARSET));
+ resultBuilder.declarePosition();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return sessionConnectionIterator.hasNext();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
index b2e384fbd44..4676559bd7b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java
@@ -85,6 +85,7 @@ public class DataNodeLocationSupplierFactory {
public List<TDataNodeLocation> getDataNodeLocations(final String
tableName) {
switch (tableName) {
case InformationSchema.QUERIES:
+ case InformationSchema.CONNECTIONS:
return getReadableDataNodeLocations();
case InformationSchema.DATABASES:
case InformationSchema.TABLES:
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
index 4f78ad9d5ea..943bdeb9cba 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java
@@ -205,6 +205,14 @@ public class ColumnHeaderConstant {
public static final String ELAPSED_TIME = "ElapsedTime";
public static final String STATEMENT = "Statement";
+ // column names for show idle connection
+ public static final String DATANODE_ID = "data_node_id";
+ public static final String USERID = "user_id";
+ public static final String SESSION_ID = "session_id";
+ public static final String USER_NAME = "user_name";
+ public static final String LAST_ACTIVE_TIME = "last_active_time";
+ public static final String CLIENT_IP = "client_ip";
+
public static final String QUERY_ID_TABLE_MODEL = "query_id";
public static final String QUERY_ID_START_TIME_TABLE_MODEL = "start_time";
public static final String DATA_NODE_ID_TABLE_MODEL = "datanode_id";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
index c58ad14989c..2db41cc3c2d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java
@@ -50,6 +50,7 @@ public class InformationSchema {
public static final String NODES = "nodes";
public static final String CONFIG_NODES = "config_nodes";
public static final String DATA_NODES = "data_nodes";
+ public static final String CONNECTIONS = "connections";
static {
final TsTable queriesTable = new TsTable(QUERIES);
@@ -362,6 +363,22 @@ public class InformationSchema {
ColumnHeaderConstant.SCHEMA_CONSENSUS_PORT_TABLE_MODEL,
TSDataType.INT32));
dataNodesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
schemaTables.put(DATA_NODES, dataNodesTable);
+
+ final TsTable connectionsTable = new TsTable(CONNECTIONS);
+ connectionsTable.addColumnSchema(
+ new TagColumnSchema(ColumnHeaderConstant.DATANODE_ID,
TSDataType.STRING));
+ connectionsTable.addColumnSchema(
+ new TagColumnSchema(ColumnHeaderConstant.USERID, TSDataType.STRING));
+ connectionsTable.addColumnSchema(
+ new TagColumnSchema(ColumnHeaderConstant.SESSION_ID,
TSDataType.STRING));
+ connectionsTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.USER_NAME,
TSDataType.STRING));
+ connectionsTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.LAST_ACTIVE_TIME,
TSDataType.TIMESTAMP));
+ connectionsTable.addColumnSchema(
+ new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP,
TSDataType.STRING));
+ connectionsTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME);
+ schemaTables.put(CONNECTIONS, connectionsTable);
}
public static Map<String, TsTable> getSchemaTables() {