This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 78a01cfeca4fd604941797273488646f92cd44f1 Author: libo <[email protected]> AuthorDate: Fri Dec 5 14:32:52 2025 +0800 Add system table named connections to resolve the idle session can be found (#16846) (cherry picked from commit 1746cdb28b135d122ae2138ac411af4c8914b600) --- .../relational/it/schema/IoTDBDatabaseIT.java | 4 +- .../iotdb/session/it/IoTDBConnectionsIT.java | 364 +++++++++++++++++++++ .../iotdb/db/protocol/session/IClientSession.java | 16 + .../iotdb/db/protocol/session/SessionManager.java | 14 + .../db/queryengine/common/ConnectionInfo.java | 64 ++++ .../InformationSchemaContentSupplierFactory.java | 39 +++ .../DataNodeLocationSupplierFactory.java | 1 + .../schema/column/ColumnHeaderConstant.java | 8 + .../commons/schema/table/InformationSchema.java | 17 + 9 files changed, 526 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index 4987b6d4b0c..e04ff838819 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -398,6 +398,7 @@ public class IoTDBDatabaseIT { "columns,INF,", "config_nodes,INF,", "configurations,INF,", + "connections,INF,", "data_nodes,INF,", "databases,INF,", "functions,INF,", @@ -644,12 +645,13 @@ public class IoTDBDatabaseIT { "information_schema,nodes,INF,USING,null,SYSTEM VIEW,", "information_schema,config_nodes,INF,USING,null,SYSTEM VIEW,", "information_schema,data_nodes,INF,USING,null,SYSTEM VIEW,", + "information_schema,connections,INF,USING,null,SYSTEM VIEW,", "test,test,INF,USING,test,BASE TABLE,", "test,view_table,100,USING,null,VIEW FROM TREE,"))); TestUtils.assertResultSetEqual( statement.executeQuery("count devices from tables where status = 'USING'"), "count(devices),", - Collections.singleton("19,")); + Collections.singleton("20,")); TestUtils.assertResultSetEqual( statement.executeQuery( "select * from columns where table_name = 'queries' or database = 'test'"), diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java new file mode 100644 index 00000000000..bc043c79e8a --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.it; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.TableClusterIT; +import org.apache.iotdb.itbase.category.TableLocalStandaloneIT; +import org.apache.iotdb.itbase.env.BaseEnv; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.iotdb.db.it.utils.TestUtils.createUser; +import static org.apache.iotdb.itbase.env.BaseEnv.TABLE_SQL_DIALECT; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({TableLocalStandaloneIT.class, TableClusterIT.class}) +public class IoTDBConnectionsIT { + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConnectionsIT.class); + private static final String SHOW_DATANODES = "show datanodes"; + private static final int COLUMN_AMOUNT = 6; + private static Set<Integer> allDataNodeId = new HashSet<>(); + + @BeforeClass + public static void setUp() throws Exception { + EnvFactory.getEnv().getConfig().getCommonConfig(); + EnvFactory.getEnv().initClusterEnvironment(1, 2); + createUser("test", "test123123456"); + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + // Get all data nodes + ResultSet result = statement.executeQuery(SHOW_DATANODES); + while (result.next()) { + allDataNodeId.add(result.getInt(ColumnHeaderConstant.NODE_ID)); + } + } + } + + @AfterClass + public static void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + // Create two connections on the different datanode, validate normal test case. + @Test + public void testDifferentDataNodeGetConnections() { + Connection conn = null; + int dataNodeId = (int) allDataNodeId.toArray()[0]; + // Create the first connection on the datanode. + try { + Connection connection = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement(); + statement.execute("USE information_schema"); + ResultSet resultSet = statement.executeQuery("SELECT * FROM connections"); + if (!resultSet.next()) { + fail(); + } + + ResultSetMetaData metaData = resultSet.getMetaData(); + Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount()); + while (resultSet.next()) { + LOGGER.info( + "{}, {}, {}, {}, {}, {}", + resultSet.getString(1), + resultSet.getString(2), + resultSet.getString(3), + resultSet.getString(4), + resultSet.getString(5), + resultSet.getString(6)); + } + + conn = connection; + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + + int anotherDataNodeId = (int) allDataNodeId.toArray()[1]; + // Create the second connection on the datanode. + try (Connection connection1 = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(anotherDataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement1 = connection1.createStatement()) { + statement1.execute("USE information_schema"); + ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM connections"); + if (!resultSet1.next()) { + fail(); + } + + while (resultSet1.next()) { + // Before close the first connection, the current record count must be two. + Assert.assertEquals(2, resultSet1.getInt(1)); + } + + conn.close(); + + ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM connections"); + if (!resultSet2.next()) { + fail(); + } + + while (resultSet2.next()) { + // After close the first connection, the current record count change into one. + Assert.assertEquals(1, resultSet2.getInt(1)); + } + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + // Create two connections on the same datanode, validate normal test case. + @Test + public void testSameDataNodeGetConnections() { + Connection conn = null; + int dataNodeId = (int) allDataNodeId.toArray()[0]; + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("USE information_schema"); + + ResultSet resultSet = + statement.executeQuery( + "SELECT * FROM connections WHERE data_node_id = '" + dataNodeId + "'"); + if (!resultSet.next()) { + fail(); + } + + ResultSetMetaData metaData = resultSet.getMetaData(); + Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount()); + while (resultSet.next()) { + LOGGER.info( + "{}, {}, {}, {}, {}, {}", + resultSet.getString(1), + resultSet.getString(2), + resultSet.getString(3), + resultSet.getString(4), + resultSet.getTimestamp(5), + resultSet.getString(6)); + } + + conn = connection; + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + + // Create the second connection on the same datanode. + try (Connection connection1 = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(dataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement1 = connection1.createStatement()) { + statement1.execute("USE information_schema"); + ResultSet resultSet1 = statement1.executeQuery("SELECT COUNT(*) FROM connections"); + if (!resultSet1.next()) { + fail(); + } + + while (resultSet1.next()) { + // Before close the first connection, the current record count must be two. + Assert.assertEquals(2, resultSet1.getInt(1)); + } + + conn.close(); + + ResultSet resultSet2 = statement1.executeQuery("SELECT COUNT(*) FROM connections"); + if (!resultSet2.next()) { + fail(); + } + + while (resultSet2.next()) { + // After close the first connection, the current record count change into one. + Assert.assertEquals(1, resultSet2.getInt(1)); + } + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + } + + // Validate normal test case when close one datanode. + @Test + public void testClosedDataNodeGetConnections() throws Exception { + if (allDataNodeId.size() <= 1) { + return; + } + int closedDataNodeId = (int) allDataNodeId.toArray()[0]; + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("USE information_schema"); + + ResultSet resultSet = + statement.executeQuery( + "SELECT COUNT(*) FROM connections WHERE data_node_id = '" + closedDataNodeId + "'"); + if (!resultSet.next()) { + fail(); + } + // All records corresponding the datanode exist Before close the datanode. Validate result + // larger than zero. + Assert.assertTrue(resultSet.getInt(1) > 0); + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + + // close the number closedDataNodeId datanode + EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().stop(); + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Wait for shutdown check + while (true) { + AtomicBoolean containUnknown = new AtomicBoolean(false); + TShowDataNodesResp showDataNodesResp = client.showDataNodes(); + showDataNodesResp + .getDataNodesInfoList() + .forEach( + dataNodeInfo -> { + if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) { + containUnknown.set(true); + } + }); + + if (containUnknown.get()) { + break; + } + TimeUnit.SECONDS.sleep(1); + } + } + + int activeDataNodeId = (int) allDataNodeId.toArray()[1]; + try (Connection connection = + EnvFactory.getEnv() + .getConnection( + EnvFactory.getEnv().dataNodeIdToWrapper(activeDataNodeId).get(), + CommonDescriptor.getInstance().getConfig().getDefaultAdminName(), + CommonDescriptor.getInstance().getConfig().getAdminPassword(), + BaseEnv.TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("USE information_schema"); + + ResultSet resultSet = + statement.executeQuery( + "SELECT COUNT(*) FROM connections WHERE data_node_id = '" + closedDataNodeId + "'"); + if (!resultSet.next()) { + fail(); + } + // All records corresponding the datanode will be cleared After close the datanode. Validate + // result if it is zero. + Assert.assertEquals(0, resultSet.getLong(1)); + } catch (Exception e) { + LOGGER.error("{}", e.getMessage(), e); + fail(e.getMessage()); + } + + // revert environment + EnvFactory.getEnv().dataNodeIdToWrapper(closedDataNodeId).get().start(); + try (SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + // Wait for restart check + while (true) { + AtomicBoolean containUnknown = new AtomicBoolean(false); + TShowDataNodesResp showDataNodesResp = client.showDataNodes(); + showDataNodesResp + .getDataNodesInfoList() + .forEach( + dataNodeInfo -> { + if (NodeStatus.Unknown.getStatus().equals(dataNodeInfo.getStatus())) { + containUnknown.set(true); + } + }); + + if (!containUnknown.get()) { + break; + } + TimeUnit.SECONDS.sleep(1); + } + } + } + + @Test + public void testNoAuthUserGetConnections() { + try (Connection connection = + EnvFactory.getEnv().getConnection("test", "test123123456", TABLE_SQL_DIALECT); + Statement statement = connection.createStatement()) { + statement.execute("USE information_schema"); + ResultSet resultSet = statement.executeQuery("SELECT * FROM connections"); + if (!resultSet.next()) { + fail(); + } + ResultSetMetaData metaData = resultSet.getMetaData(); + Assert.assertEquals(COLUMN_AMOUNT, metaData.getColumnCount()); + } catch (SQLException e) { + Assert.assertEquals( + "803: Access Denied: No permissions for this operation, please add privilege SYSTEM", + e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java index 21d4121ffd3..351806de099 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/IClientSession.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.protocol.session; import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; +import org.apache.iotdb.db.queryengine.common.ConnectionInfo; import org.apache.iotdb.service.rpc.thrift.TSConnectionInfo; import org.apache.iotdb.service.rpc.thrift.TSConnectionType; @@ -56,6 +57,8 @@ public abstract class IClientSession { @Nullable private String databaseName; + private long lastActiveTime; + public abstract String getClientAddress(); public abstract int getClientPort(); @@ -140,6 +143,11 @@ public abstract class IClientSession { getUsername(), getLogInTime(), getConnectionId(), getConnectionType()); } + public ConnectionInfo convertToConnectionInfo() { + return new ConnectionInfo( + getUserId(), getUsername(), getId(), getLastActiveTime(), getClientAddress()); + } + /** * statementIds that this client opens.<br> * For JDBC clients, each Statement instance has a statement id.<br> @@ -180,6 +188,14 @@ public abstract class IClientSession { this.databaseName = databaseName; } + public long getLastActiveTime() { + return lastActiveTime; + } + + public void setLastActiveTime(long lastActiveTime) { + this.lastActiveTime = lastActiveTime; + } + public enum SqlDialect { TREE((byte) 0), TABLE((byte) 1); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java index d098cf70223..a4a28efa5ec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.auth.LoginLockManager; import org.apache.iotdb.db.protocol.basic.BasicOpenSessionResp; import org.apache.iotdb.db.protocol.thrift.OperationType; +import org.apache.iotdb.db.queryengine.common.ConnectionInfo; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager; import org.apache.iotdb.db.utils.DataNodeAuthUtils; @@ -58,6 +59,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TimeZone; @@ -386,6 +388,10 @@ public class SessionManager implements SessionManagerMBean { /** update connection idle time after execution. */ public void updateIdleTime() { currSessionIdleTime.set(System.nanoTime()); + IClientSession session = currSession.get(); + if (session != null) { + session.setLastActiveTime(CommonDateTimeUtils.currentTime()); + } } public TimeZone getSessionTimeZone() { @@ -538,6 +544,14 @@ public class SessionManager implements SessionManagerMBean { .collect(Collectors.toList())); } + public List<ConnectionInfo> getAllSessionConnectionInfo() { + return sessions.keySet().stream() + .filter(s -> StringUtils.isNotEmpty(s.getUsername())) + .map(IClientSession::convertToConnectionInfo) + .sorted(Comparator.comparingLong(ConnectionInfo::getLastActiveTime)) + .collect(Collectors.toList()); + } + private static class SessionManagerHelper { private static final SessionManager INSTANCE = new SessionManager(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java new file mode 100644 index 00000000000..3889e533187 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/ConnectionInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.common; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +public class ConnectionInfo { + private static final int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + private final long userId; + private final String userName; + private final long sessionId; + private final long lastActiveTime; + private final String clientAddress; + + public ConnectionInfo( + long userId, String userName, long sessionId, long lastActiveTime, String clientAddress) { + this.userId = userId; + this.userName = userName; + this.sessionId = sessionId; + this.lastActiveTime = lastActiveTime; + this.clientAddress = clientAddress; + } + + public int getDataNodeId() { + return dataNodeId; + } + + public long getUserId() { + return userId; + } + + public String getUserName() { + return userName; + } + + public long getSessionId() { + return sessionId; + } + + public long getLastActiveTime() { + return lastActiveTime; + } + + public String getClientAddress() { + return clientAddress; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index 1449b565c0e..fc688816565 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -71,6 +71,8 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.client.ainode.AINodeClient; import org.apache.iotdb.db.protocol.client.ainode.AINodeClientManager; import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; +import org.apache.iotdb.db.queryengine.common.ConnectionInfo; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.ShowCreateViewTask; @@ -126,6 +128,9 @@ import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.Sho import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_EXTERNAL; public class InformationSchemaContentSupplierFactory { + + private static final SessionManager sessionManager = SessionManager.getInstance(); + private InformationSchemaContentSupplierFactory() {} public static Iterator<TsBlock> getSupplier( @@ -166,6 +171,8 @@ public class InformationSchemaContentSupplierFactory { return new ConfigNodesSupplier(dataTypes, userEntity); case InformationSchema.DATA_NODES: return new DataNodesSupplier(dataTypes, userEntity); + case InformationSchema.CONNECTIONS: + return new ConnectionsSupplier(dataTypes, userEntity); default: throw new UnsupportedOperationException("Unknown table: " + tableName); } @@ -1255,4 +1262,36 @@ public class InformationSchemaContentSupplierFactory { protected abstract void constructLine(); } + + private static class ConnectionsSupplier extends TsBlockSupplier { + private Iterator<ConnectionInfo> sessionConnectionIterator; + + private ConnectionsSupplier(final List<TSDataType> dataTypes, final UserEntity userEntity) { + super(dataTypes); + accessControl.checkUserGlobalSysPrivilege(userEntity); + sessionConnectionIterator = sessionManager.getAllSessionConnectionInfo().iterator(); + } + + @Override + protected void constructLine() { + ConnectionInfo connectionInfo = sessionConnectionIterator.next(); + columnBuilders[0].writeBinary( + new Binary(String.valueOf(connectionInfo.getDataNodeId()), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeBinary( + new Binary(String.valueOf(connectionInfo.getUserId()), TSFileConfig.STRING_CHARSET)); + columnBuilders[2].writeBinary( + new Binary(String.valueOf(connectionInfo.getSessionId()), TSFileConfig.STRING_CHARSET)); + columnBuilders[3].writeBinary( + new Binary(connectionInfo.getUserName(), TSFileConfig.STRING_CHARSET)); + columnBuilders[4].writeLong(connectionInfo.getLastActiveTime()); + columnBuilders[5].writeBinary( + new Binary(connectionInfo.getClientAddress(), TSFileConfig.STRING_CHARSET)); + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return sessionConnectionIterator.hasNext(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index b2e384fbd44..4676559bd7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -85,6 +85,7 @@ public class DataNodeLocationSupplierFactory { public List<TDataNodeLocation> getDataNodeLocations(final String tableName) { switch (tableName) { case InformationSchema.QUERIES: + case InformationSchema.CONNECTIONS: return getReadableDataNodeLocations(); case InformationSchema.DATABASES: case InformationSchema.TABLES: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index 4f78ad9d5ea..943bdeb9cba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -205,6 +205,14 @@ public class ColumnHeaderConstant { public static final String ELAPSED_TIME = "ElapsedTime"; public static final String STATEMENT = "Statement"; + // column names for show idle connection + public static final String DATANODE_ID = "data_node_id"; + public static final String USERID = "user_id"; + public static final String SESSION_ID = "session_id"; + public static final String USER_NAME = "user_name"; + public static final String LAST_ACTIVE_TIME = "last_active_time"; + public static final String CLIENT_IP = "client_ip"; + public static final String QUERY_ID_TABLE_MODEL = "query_id"; public static final String QUERY_ID_START_TIME_TABLE_MODEL = "start_time"; public static final String DATA_NODE_ID_TABLE_MODEL = "datanode_id"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index c58ad14989c..2db41cc3c2d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -50,6 +50,7 @@ public class InformationSchema { public static final String NODES = "nodes"; public static final String CONFIG_NODES = "config_nodes"; public static final String DATA_NODES = "data_nodes"; + public static final String CONNECTIONS = "connections"; static { final TsTable queriesTable = new TsTable(QUERIES); @@ -362,6 +363,22 @@ public class InformationSchema { ColumnHeaderConstant.SCHEMA_CONSENSUS_PORT_TABLE_MODEL, TSDataType.INT32)); dataNodesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(DATA_NODES, dataNodesTable); + + final TsTable connectionsTable = new TsTable(CONNECTIONS); + connectionsTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.DATANODE_ID, TSDataType.STRING)); + connectionsTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.USERID, TSDataType.STRING)); + connectionsTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.SESSION_ID, TSDataType.STRING)); + connectionsTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.USER_NAME, TSDataType.STRING)); + connectionsTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.LAST_ACTIVE_TIME, TSDataType.TIMESTAMP)); + connectionsTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.CLIENT_IP, TSDataType.STRING)); + connectionsTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(CONNECTIONS, connectionsTable); } public static Map<String, TsTable> getSchemaTables() {
