This is an automated email from the ASF dual-hosted git repository. hxd pushed a commit to branch feature-client-session-0.13 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 32178b0e5e4560643fa1bf4718b911b9a04e94b8 Author: xiangdong huang <[email protected]> AuthorDate: Sun Oct 23 23:45:10 2022 +0800 use threadLocal<ClientSession> to replace session id --- .../java/org/apache/iotdb/cli/AbstractCli.java | 2 +- .../main/java/org/apache/iotdb/tool/ImportCsv.java | 4 +- .../iotdb/cluster/server/ClusterRPCService.java | 9 +- external-api/pom.xml | 8 + .../external/api/thrift/JudgableServerContext.java | 41 +++ .../external/api/thrift/ServerContextFactory.java | 23 ++ .../iotdb/db/protocol/mqtt/PublishHandler.java | 35 ++- .../iotdb/db/qp/physical/crud/GroupByTimePlan.java | 2 +- .../apache/iotdb/db/qp/utils/DateTimeUtils.java | 2 +- .../iotdb/db/query/control/SessionManager.java | 153 +++++---- .../db/query/control/SessionTimeoutManager.java | 33 +- .../query/control/clientsession/ClientSession.java | 41 +++ .../control/clientsession/IClientSession.java | 101 ++++++ .../control/clientsession/MqttClientSession.java | 46 +++ .../dataset/groupby/GroupByEngineDataSet.java | 2 +- .../apache/iotdb/db/query/executor/fill/IFill.java | 2 +- .../iotdb/db/service/basic/ServiceProvider.java | 75 +++-- .../thrift/TThreadPoolServerWithContext.java | 150 +++++++++ .../db/service/thrift/ThriftServiceThread.java | 3 +- .../thrift/handler/RPCServiceThriftHandler.java | 67 +++- .../service/thrift/impl/InfluxDBServiceImpl.java | 42 ++- .../db/service/thrift/impl/TSServiceImpl.java | 341 +++++++++++---------- 22 files changed, 862 insertions(+), 320 deletions(-) diff --git a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index 1cbed9d0af..9de6b5608d 100644 --- a/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -578,7 +578,7 @@ public abstract class AbstractCli { * @param columnCount the number of column * @param resultSetMetaData jdbc resultSetMetaData * @param zoneId your time zone - * @return List<List<String>> result + * @return {@literal List<List<String>> result} * @throws SQLException throw exception */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning diff --git a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java index f0495c43f8..c966382104 100644 --- a/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java +++ b/cli/src/main/java/org/apache/iotdb/tool/ImportCsv.java @@ -641,8 +641,8 @@ public class ImportCsv extends AbstractCsvTool { * read data from the CSV file * * @param path - * @return - * @throws IOException + * @return CSVParser csv parser + * @throws IOException when reading the csv file failed. */ private static CSVParser readCsvFile(String path) throws IOException { return CSVFormat.Builder.create(CSVFormat.DEFAULT) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java index f7c5e95854..68e4c08a2e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterRPCService.java @@ -20,10 +20,12 @@ package org.apache.iotdb.cluster.server; import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.query.manage.ClusterSessionManager; import org.apache.iotdb.db.concurrent.ThreadName; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.runtime.RPCServiceException; +import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.service.ServiceType; import org.apache.iotdb.db.service.thrift.ProcessorWithMetrics; import org.apache.iotdb.db.service.thrift.ThriftService; @@ -79,7 +81,12 @@ public class ClusterRPCService extends ThriftService implements ClusterRPCServic getBindPort(), config.getRpcMaxConcurrentClientNum(), config.getThriftServerAwaitTimeForStopService(), - new RPCServiceThriftHandler(impl), + new RPCServiceThriftHandler(impl) { + @Override + protected SessionManager getSessionManager() { + return ClusterSessionManager.getInstance(); + } + }, IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()); } catch (RPCServiceException e) { throw new IllegalAccessException(e.getMessage()); diff --git a/external-api/pom.xml b/external-api/pom.xml index 086c6a08b4..28ec1eed74 100644 --- a/external-api/pom.xml +++ b/external-api/pom.xml @@ -28,6 +28,14 @@ </parent> <modelVersion>4.0.0</modelVersion> <artifactId>external-api</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>${thrift.version}</version> + <scope>provided</scope> + </dependency> + </dependencies> <profiles> <profile> <id>get-jar-with-dependencies</id> diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java new file mode 100644 index 0000000000..5948804a57 --- /dev/null +++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/JudgableServerContext.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.external.api.thrift; + +import org.apache.thrift.server.ServerContext; + +public interface JudgableServerContext extends ServerContext { + + /** + * this method will be called when a client connects to the IoTDB server. + * + * @return true / false + */ + public boolean authorised(); + + @Override + default <T> T unwrap(Class<T> iface) { + return null; + } + + @Override + default boolean isWrapperFor(Class<?> iface) { + return false; + }; +} diff --git a/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java new file mode 100644 index 0000000000..b6788b376a --- /dev/null +++ b/external-api/src/main/java/org/apache/iotdb/external/api/thrift/ServerContextFactory.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.external.api.thrift; + +public interface ServerContextFactory { + JudgableServerContext newServerContext(); +} diff --git a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java index 892e17cde5..8c6a1d9ad5 100644 --- a/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/protocol/mqtt/PublishHandler.java @@ -22,8 +22,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.metadata.path.PartialPath; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; +import org.apache.iotdb.db.query.control.clientsession.MqttClientSession; import org.apache.iotdb.db.service.IoTDB; -import org.apache.iotdb.db.service.basic.BasicOpenSessionResp; import org.apache.iotdb.db.service.basic.ServiceProvider; import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.iotdb.service.rpc.thrift.TSStatus; @@ -46,7 +46,8 @@ import java.util.concurrent.ConcurrentHashMap; public class PublishHandler extends AbstractInterceptHandler { private final ServiceProvider serviceProvider = IoTDB.serviceProvider; - private final ConcurrentHashMap<String, Long> clientIdToSessionIdMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, MqttClientSession> clientIdToSessionMap = + new ConcurrentHashMap<>(); private static final boolean isEnableOperationSync = IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync(); private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class); @@ -68,15 +69,17 @@ public class PublishHandler extends AbstractInterceptHandler { @Override public void onConnect(InterceptConnectMessage msg) { - if (!clientIdToSessionIdMap.containsKey(msg.getClientID())) { + if (!clientIdToSessionMap.containsKey(msg.getClientID())) { try { - BasicOpenSessionResp basicOpenSessionResp = - serviceProvider.openSession( - msg.getUsername(), - new String(msg.getPassword()), - ZoneId.systemDefault().toString(), - TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); - clientIdToSessionIdMap.put(msg.getClientID(), basicOpenSessionResp.getSessionId()); + MqttClientSession session = new MqttClientSession(msg.getClientID()); + // TODO should we put this session into a ThreadLocal in SessionManager? + serviceProvider.login( + session, + msg.getUsername(), + new String(msg.getPassword()), + ZoneId.systemDefault().toString(), + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); + clientIdToSessionMap.put(msg.getClientID(), session); } catch (TException e) { throw new RuntimeException(e); } @@ -85,19 +88,19 @@ public class PublishHandler extends AbstractInterceptHandler { @Override public void onDisconnect(InterceptDisconnectMessage msg) { - Long sessionId = clientIdToSessionIdMap.remove(msg.getClientID()); - if (null != sessionId) { - serviceProvider.closeSession(sessionId); + MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID()); + if (null != session) { + serviceProvider.closeSession(session); } } @Override public void onPublish(InterceptPublishMessage msg) { String clientId = msg.getClientID(); - if (!clientIdToSessionIdMap.containsKey(clientId)) { + if (!clientIdToSessionMap.containsKey(clientId)) { return; } - long sessionId = clientIdToSessionIdMap.get(clientId); + MqttClientSession session = clientIdToSessionMap.get(clientId); ByteBuf payload = msg.getPayload(); String topic = msg.getTopicName(); String username = msg.getUsername(); @@ -132,7 +135,7 @@ public class PublishHandler extends AbstractInterceptHandler { event.getTimestamp(), event.getMeasurements().toArray(new String[0]), event.getValues().toArray(new String[0])); - TSStatus tsStatus = serviceProvider.checkAuthority(plan, sessionId); + TSStatus tsStatus = serviceProvider.checkAuthority(plan, session); if (tsStatus != null) { LOG.warn(tsStatus.message); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java index 1fab54b56c..bfb8785eca 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java @@ -128,7 +128,7 @@ public class GroupByTimePlan extends AggregationPlan { plan.getEndTime(), plan.isSlidingStepByMonth(), plan.isIntervalByMonth(), - SessionManager.getInstance().getCurrSessionTimeZone()))); + SessionManager.getInstance().getSessionTimeZone()))); } else { return new GlobalTimeExpression( new GroupByFilter( diff --git a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java index b6901726bd..eb25720a2c 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/utils/DateTimeUtils.java @@ -584,7 +584,7 @@ public class DateTimeUtils { res *= 30 * 86_400_000L; } else { Calendar calendar = Calendar.getInstance(); - calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone()); + calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone()); calendar.setTimeInMillis(currentTime); calendar.add(Calendar.MONTH, (int) (value)); res = calendar.getTimeInMillis() - currentTime; diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java index 7e1861eeed..061f635d81 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.query.dataset.UDTFDataSet; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; @@ -34,69 +35,110 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicLong; +/** + * Session Manager is for mananging active sessions. It will be used by both Thrift based services + * (i.e., TSServiceImpl and InfluxdbService) and Mqtt based service. <br> + * Thrift based services are client-thread model, i.e., each client has a thread. So we can use + * threadLocal for such services.<br> + * However, Mqtt based service use message-thread model, i.e, each message has a short thread. So, + * we can not use threadLocal for such services. + */ public class SessionManager { private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class); // When the client abnormally exits, we can still know who to disconnect - private final ThreadLocal<Long> currSessionId = new ThreadLocal<>(); - // Record the username for every rpc connection (session). - private final Map<Long, String> sessionIdToUsername = new ConcurrentHashMap<>(); - private final Map<Long, ZoneId> sessionIdToZoneId = new ConcurrentHashMap<>(); + /** currSession can be only used in client-thread model services. */ + private final ThreadLocal<IClientSession> currSession = new ThreadLocal<>(); + + // we keep this sessionIdGenerator just for keep Compatible with v0.13 + @Deprecated private final AtomicLong sessionIdGenerator = new AtomicLong(); - // The sessionId is unique in one IoTDB instance. - private final AtomicLong sessionIdGenerator = new AtomicLong(); // The statementId is unique in one IoTDB instance. private final AtomicLong statementIdGenerator = new AtomicLong(); // (sessionId -> Set(statementId)) - private final Map<Long, Set<Long>> sessionIdToStatementId = new ConcurrentHashMap<>(); + private final Map<IClientSession, Set<Long>> sessionToStatementId = new ConcurrentHashMap<>(); // (statementId -> Set(queryId)) private final Map<Long, Set<Long>> statementIdToQueryId = new ConcurrentHashMap<>(); // (queryId -> QueryDataSet) private final Map<Long, QueryDataSet> queryIdToDataSet = new ConcurrentHashMap<>(); - // (sessionId -> client version number) - private final Map<Long, IoTDBConstant.ClientVersion> sessionIdToClientVersion = - new ConcurrentHashMap<>(); - protected SessionManager() { // singleton } - public Long getCurrSessionId() { - return currSessionId.get(); - } - - public void removeCurrSessionId() { - currSessionId.remove(); + /** + * this method can be only used in client-thread model. + * + * @return + */ + public IClientSession getCurrSession() { + return currSession.get(); } - public TimeZone getCurrSessionTimeZone() { - if (getCurrSessionId() != null) { - return TimeZone.getTimeZone(SessionManager.getInstance().getZoneId(getCurrSessionId())); + public TimeZone getSessionTimeZone() { + IClientSession session = currSession.get(); + if (session != null) { + return session.getTimeZone(); } else { // only used for test return TimeZone.getTimeZone("+08:00"); } } - public long requestSessionId( - String username, String zoneId, IoTDBConstant.ClientVersion clientVersion) { - long sessionId = sessionIdGenerator.incrementAndGet(); - - currSessionId.set(sessionId); - sessionIdToUsername.put(sessionId, username); - sessionIdToZoneId.put(sessionId, ZoneId.of(zoneId)); - sessionIdToClientVersion.put(sessionId, clientVersion); + /** + * this method can be only used in client-thread model. But, in message-thread model based + * service, calling this method has no side effect. <br> + * MUST CALL THIS METHOD IN client-thread model services. Fortunately, we can just call this + * method in thrift's event handler. + * + * @return + */ + public void removeCurrSession() { + currSession.remove(); + } - return sessionId; + /** + * this method can be only used in client-thread model. Do not use this method in message-thread + * model based service. + * + * @param session + * @return false if the session has been initialized. + */ + public boolean registerSession(IClientSession session) { + if (this.currSession.get() != null) { + LOGGER.error("the client session is registered repeatedly, pls check whether this is a bug."); + return false; + } + this.currSession.set(session); + return true; } - public boolean releaseSessionResource(long sessionId) { - sessionIdToZoneId.remove(sessionId); - sessionIdToClientVersion.remove(sessionId); + /** + * must be called after registerSession()) will mark the session login. + * + * @param username + * @param zoneId + * @param clientVersion + */ + public void supplySession( + IClientSession session, + String username, + String zoneId, + IoTDBConstant.ClientVersion clientVersion) { + session.setId(sessionIdGenerator.incrementAndGet()); + session.setUsername(username); + session.setZoneId(ZoneId.of(zoneId)); + session.setClientVersion(clientVersion); + session.setLogin(true); + } - Set<Long> statementIdSet = sessionIdToStatementId.remove(sessionId); + /** + * @param session + * @return true if releasing successfully, false otherwise (e.g., the session does not exist) + */ + public boolean releaseSessionResource(IClientSession session) { + Set<Long> statementIdSet = sessionToStatementId.remove(session); if (statementIdSet != null) { for (Long statementId : statementIdSet) { Set<Long> queryIdSet = statementIdToQueryId.remove(statementId); @@ -106,34 +148,41 @@ public class SessionManager { } } } + return true; } - - return sessionIdToUsername.remove(sessionId) != null; + // TODO if there is no statement for the session, how to return (true or false?) + return false; } - public long getSessionIdByQueryId(long queryId) { + /** + * @param queryId + * @return null if not found. (e.g., the client session is closed already. TODO: do we really have + * this case?) + */ + public IClientSession getSessionIdByQueryId(long queryId) { // TODO: make this more efficient with a queryId -> sessionId map for (Map.Entry<Long, Set<Long>> statementToQueries : statementIdToQueryId.entrySet()) { if (statementToQueries.getValue().contains(queryId)) { - for (Map.Entry<Long, Set<Long>> sessionToStatements : sessionIdToStatementId.entrySet()) { + for (Map.Entry<IClientSession, Set<Long>> sessionToStatements : + sessionToStatementId.entrySet()) { if (sessionToStatements.getValue().contains(statementToQueries.getKey())) { return sessionToStatements.getKey(); } } } } - return -1; + return null; } - public long requestStatementId(long sessionId) { + public long requestStatementId(IClientSession session) { long statementId = statementIdGenerator.incrementAndGet(); - sessionIdToStatementId - .computeIfAbsent(sessionId, s -> new CopyOnWriteArraySet<>()) + sessionToStatementId + .computeIfAbsent(session, s -> new CopyOnWriteArraySet<>()) .add(statementId); return statementId; } - public void closeStatement(long sessionId, long statementId) { + public void closeStatement(IClientSession session, long statementId) { Set<Long> queryIdSet = statementIdToQueryId.remove(statementId); if (queryIdSet != null) { for (Long queryId : queryIdSet) { @@ -141,8 +190,8 @@ public class SessionManager { } } - if (sessionIdToStatementId.containsKey(sessionId)) { - sessionIdToStatementId.get(sessionId).remove(statementId); + if (sessionToStatementId.containsKey(session)) { + sessionToStatementId.get(session).remove(statementId); } } @@ -176,18 +225,6 @@ public class SessionManager { } } - public String getUsername(Long sessionId) { - return sessionIdToUsername.get(sessionId); - } - - public ZoneId getZoneId(Long sessionId) { - return sessionIdToZoneId.get(sessionId); - } - - public void setTimezone(Long sessionId, String zone) { - sessionIdToZoneId.put(sessionId, ZoneId.of(zone)); - } - public boolean hasDataset(Long queryId) { return queryIdToDataSet.containsKey(queryId); } @@ -211,10 +248,6 @@ public class SessionManager { } } - public IoTDBConstant.ClientVersion getClientVersion(Long sessionId) { - return sessionIdToClientVersion.get(sessionId); - } - public static SessionManager getInstance() { return SessionManagerHelper.INSTANCE; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java index 1f179c5e40..c9ff8efb02 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java +++ b/server/src/main/java/org/apache/iotdb/db/query/control/SessionTimeoutManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.control; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.service.basic.ServiceProvider; import org.slf4j.Logger; @@ -37,7 +38,7 @@ public class SessionTimeoutManager { private static final long SESSION_TIMEOUT = IoTDBDescriptor.getInstance().getConfig().getSessionTimeoutThreshold(); - private Map<Long, Long> sessionIdToLastActiveTime; + private Map<IClientSession, Long> sessionToLastActiveTime; private ScheduledExecutorService executorService; private SessionTimeoutManager() { @@ -45,7 +46,7 @@ public class SessionTimeoutManager { return; } - this.sessionIdToLastActiveTime = new ConcurrentHashMap<>(); + this.sessionToLastActiveTime = new ConcurrentHashMap<>(); this.executorService = IoTDBThreadPoolFactory.newScheduledThreadPool(1, "session-timeout-manager"); @@ -59,37 +60,43 @@ public class SessionTimeoutManager { TimeUnit.MILLISECONDS); } - public void register(long sessionId) { + public void register(IClientSession session) { if (SESSION_TIMEOUT == 0) { return; } - sessionIdToLastActiveTime.put(sessionId, System.currentTimeMillis()); + sessionToLastActiveTime.put(session, System.currentTimeMillis()); } - public boolean unregister(long sessionId) { + /** + * unregister the session and release all its query resources. + * + * @param session + * @return true if removing successfully, false otherwise (e.g., the session does not exist) + */ + public boolean unregister(IClientSession session) { if (SESSION_TIMEOUT == 0) { - return ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId); + return ServiceProvider.SESSION_MANAGER.releaseSessionResource(session); } - if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(sessionId)) { - return sessionIdToLastActiveTime.remove(sessionId) != null; + if (ServiceProvider.SESSION_MANAGER.releaseSessionResource(session)) { + return sessionToLastActiveTime.remove(session) != null; } return false; } - public void refresh(long sessionId) { + public void refresh(IClientSession session) { if (SESSION_TIMEOUT == 0) { return; } - sessionIdToLastActiveTime.computeIfPresent(sessionId, (k, v) -> System.currentTimeMillis()); + sessionToLastActiveTime.computeIfPresent(session, (k, v) -> System.currentTimeMillis()); } private void cleanup() { long currentTime = System.currentTimeMillis(); - sessionIdToLastActiveTime.entrySet().stream() + sessionToLastActiveTime.entrySet().stream() .filter(entry -> entry.getValue() + SESSION_TIMEOUT < currentTime) .forEach( entry -> { @@ -106,11 +113,11 @@ public class SessionTimeoutManager { return SessionTimeoutManagerHelper.INSTANCE; } - public boolean isSessionAlive(long sessionId) { + public boolean isSessionAlive(IClientSession session) { if (SESSION_TIMEOUT == 0) { return true; } - return sessionIdToLastActiveTime.containsKey(sessionId); + return sessionToLastActiveTime.containsKey(session); } private static class SessionTimeoutManagerHelper { diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java new file mode 100644 index 0000000000..9edcc6f077 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/ClientSession.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.control.clientsession; + +import java.net.InetSocketAddress; + +/** Client Session is the only identity for a connection. */ +public class ClientSession extends IClientSession { + + InetSocketAddress clientNet; + + public ClientSession(InetSocketAddress clientNet) { + this.clientNet = clientNet; + } + + @Override + public String getClientAddress() { + return clientNet.getHostName(); + } + + @Override + int getClientPort() { + return clientNet.getPort(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java new file mode 100644 index 0000000000..b3ae42e68b --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/IClientSession.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.control.clientsession; + +import org.apache.iotdb.db.conf.IoTDBConstant.ClientVersion; + +import java.time.ZoneId; +import java.util.TimeZone; + +public abstract class IClientSession { + + /** id is just used for keep compatible with v0.13 */ + @Deprecated long id; + + ClientVersion clientVersion; + + ZoneId zoneId; + + // TODO: why some Statement Plans use timeZone while others use ZoneId? + TimeZone timeZone; + + String username; + + boolean login = false; + + abstract String getClientAddress(); + + abstract int getClientPort(); + + public void setClientVersion(ClientVersion clientVersion) { + this.clientVersion = clientVersion; + } + + public ClientVersion getClientVersion() { + return this.clientVersion; + } + + public ZoneId getZoneId() { + return this.zoneId; + } + + public void setZoneId(ZoneId zoneId) { + this.zoneId = zoneId; + this.timeZone = TimeZone.getTimeZone(zoneId); + } + + public TimeZone getTimeZone() { + return timeZone; + } + + public void setTimeZone(TimeZone timeZone) { + this.timeZone = timeZone; + this.zoneId = timeZone.toZoneId(); + } + + public String getUsername() { + return this.username; + } + + public void setUsername(String username) { + this.username = username; + } + + public boolean isLogin() { + return login; + } + + public void setLogin(boolean login) { + this.login = login; + } + + @Deprecated + public long getId() { + return id; + } + + @Deprecated + public void setId(long id) { + this.id = id; + } + + public String toString() { + return String.format("%d-%s:%d", getId(), getClientAddress(), getClientPort()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java new file mode 100644 index 0000000000..b458cb22f3 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/control/clientsession/MqttClientSession.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.control.clientsession; + +public class MqttClientSession extends IClientSession { + + String clientID; + + public MqttClientSession(String clientID) { + this.clientID = clientID; + } + + public String getClientID() { + return clientID; + } + + @Override + public String getClientAddress() { + return clientID; + } + + @Override + public int getClientPort() { + return 0; + } + + public String toString() { + return String.format("%d-%s", getId(), getClientID()); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java index ccd374b9b3..ed54e5920a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java @@ -209,7 +209,7 @@ public abstract class GroupByEngineDataSet extends QueryDataSet { */ public static long calcIntervalByMonth(long startTime, long numMonths) { Calendar calendar = Calendar.getInstance(); - calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone()); + calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone()); calendar.setTimeInMillis(startTime); boolean isLastDayOfMonth = calendar.get(Calendar.DAY_OF_MONTH) == calendar.getActualMaximum(Calendar.DAY_OF_MONTH); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java index 5fbb0575dd..0ccfb23e15 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/IFill.java @@ -143,7 +143,7 @@ public abstract class IFill { protected long slideMonth(long startTime, int monthNum) { Calendar calendar = Calendar.getInstance(); - calendar.setTimeZone(SessionManager.getInstance().getCurrSessionTimeZone()); + calendar.setTimeZone(SessionManager.getInstance().getSessionTimeZone()); calendar.setTimeInMillis(startTime); calendar.add(Calendar.MONTH, monthNum); return calendar.getTimeInMillis(); diff --git a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java index 0806dec7a7..0987d2d97f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java +++ b/server/src/main/java/org/apache/iotdb/db/service/basic/ServiceProvider.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryTimeManager; import org.apache.iotdb.db.query.control.SessionManager; import org.apache.iotdb.db.query.control.SessionTimeoutManager; +import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.query.control.tracing.TracingManager; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -58,6 +59,10 @@ import java.sql.SQLException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException; +/** + * There is only one ServiceProvider instance for each IoTDB instance. Both client-thread model + * based services and message-thread model based services (e.g., mqtt) are using this service. + */ public abstract class ServiceProvider { protected static final Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class); @@ -105,14 +110,13 @@ public abstract class ServiceProvider { * * @return true: If logged in; false: If not logged in */ - public boolean checkLogin(long sessionId) { - Long currSessionId = SESSION_MANAGER.getCurrSessionId(); - boolean isLoggedIn = currSessionId != null && currSessionId == sessionId; + public boolean checkLogin(IClientSession session) { + boolean isLoggedIn = session != null && session.isLogin(); if (!isLoggedIn) { LOGGER.info("{}: Not login. ", IoTDBConstant.GLOBAL_DB_NAME); return false; } else { - SessionTimeoutManager.getInstance().refresh(sessionId); + SessionTimeoutManager.getInstance().refresh(session); } return isLoggedIn; } @@ -120,11 +124,11 @@ public abstract class ServiceProvider { /** * Check whether current session is timeout. * - * @param sessionId Session id. + * @param session clientSession. * @return true: If session timeout; false: If not session timeout. */ - public boolean checkSessionTimeout(long sessionId) { - if (!SessionTimeoutManager.getInstance().isSessionAlive(sessionId)) { + public boolean checkSessionTimeout(IClientSession session) { + if (!SessionTimeoutManager.getInstance().isSessionAlive(session)) { return true; } return false; @@ -143,9 +147,9 @@ public abstract class ServiceProvider { username, plan.getAuthPaths(), plan.getOperatorType(), targetUser); } - public TSStatus checkAuthority(PhysicalPlan plan, long sessionId) { + public TSStatus checkAuthority(PhysicalPlan plan, IClientSession session) { try { - if (!checkAuthorization(plan, SESSION_MANAGER.getUsername(sessionId))) { + if (!checkAuthorization(plan, session.getUsername())) { return RpcUtils.getStatus( TSStatusCode.NO_PERMISSION_ERROR, "No permissions for this operation, please add privilege " @@ -162,7 +166,8 @@ public abstract class ServiceProvider { return null; } - public BasicOpenSessionResp openSession( + public BasicOpenSessionResp login( + IClientSession session, String username, String password, String zoneId, @@ -187,7 +192,6 @@ public abstract class ServiceProvider { loginMessage = e.getMessage(); } - long sessionId = -1; if (status) { // check the version compatibility boolean compatible = checkCompatibility(tsProtocolVersion); @@ -195,74 +199,69 @@ public abstract class ServiceProvider { openSessionResp.setCode(TSStatusCode.INCOMPATIBLE_VERSION.getStatusCode()); openSessionResp.setMessage( "The version is incompatible, please upgrade to " + IoTDBConstant.VERSION); - return openSessionResp.sessionId(sessionId); + return openSessionResp.sessionId(-1); } openSessionResp.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); openSessionResp.setMessage("Login successfully"); - sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion); - + SESSION_MANAGER.supplySession(session, username, zoneId, clientVersion); LOGGER.info( "{}: Login status: {}. User : {}, opens Session-{}", IoTDBConstant.GLOBAL_DB_NAME, openSessionResp.getMessage(), username, - sessionId); + session); } else { openSessionResp.setMessage(loginMessage != null ? loginMessage : "Authentication failed."); openSessionResp.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD_ERROR.getStatusCode()); - - sessionId = SESSION_MANAGER.requestSessionId(username, zoneId, clientVersion); AUDIT_LOGGER.info("User {} opens Session failed with an incorrect password", username); + // TODO we should close this connection ASAP, otherwise there will be DDoS. } - - SessionTimeoutManager.getInstance().register(sessionId); - return openSessionResp.sessionId(sessionId); + SessionTimeoutManager.getInstance().register(session); + return openSessionResp.sessionId(session == null ? -1 : session.getId()); } - public BasicOpenSessionResp openSession( - String username, String password, String zoneId, TSProtocolVersion tsProtocolVersion) + public BasicOpenSessionResp login( + IClientSession session, + String username, + String password, + String zoneId, + TSProtocolVersion tsProtocolVersion) throws TException { - return openSession( - username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12); + return login( + session, username, password, zoneId, tsProtocolVersion, IoTDBConstant.ClientVersion.V_0_12); } - public boolean closeSession(long sessionId) { - AUDIT_LOGGER.info("Session-{} is closing", sessionId); - - SESSION_MANAGER.removeCurrSessionId(); - - return SessionTimeoutManager.getInstance().unregister(sessionId); + public boolean closeSession(IClientSession session) { + AUDIT_LOGGER.info("Session-{} is closing", session); + return SessionTimeoutManager.getInstance().unregister(session); } public TSStatus closeOperation( - long sessionId, + IClientSession session, long queryId, long statementId, boolean haveStatementId, boolean haveSetQueryId) { - if (!checkLogin(sessionId)) { + if (!checkLogin(session)) { return RpcUtils.getStatus( TSStatusCode.NOT_LOGIN_ERROR, "Log in failed. Either you are not authorized or the session has timed out."); } - if (checkSessionTimeout(sessionId)) { + if (checkSessionTimeout(session)) { return RpcUtils.getStatus(TSStatusCode.SESSION_TIMEOUT, "Session timeout"); } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( - "{}: receive close operation from Session {}", - IoTDBConstant.GLOBAL_DB_NAME, - SESSION_MANAGER.getCurrSessionId()); + "{}: receive close operation from Session {}", IoTDBConstant.GLOBAL_DB_NAME, session); } - try { if (haveStatementId) { if (haveSetQueryId) { SESSION_MANAGER.closeDataset(statementId, queryId); } else { - SESSION_MANAGER.closeStatement(sessionId, statementId); + SESSION_MANAGER.closeStatement(session, statementId); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } else { diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java new file mode 100644 index 0000000000..8d36047509 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/TThreadPoolServerWithContext.java @@ -0,0 +1,150 @@ +package org.apache.iotdb.db.service.thrift; + +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.server.ServerContext; +import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; +import java.util.concurrent.RejectedExecutionException; + +public class TThreadPoolServerWithContext extends TThreadPoolServer { + private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServerWithContext.class); + + public TThreadPoolServerWithContext(Args args) { + super(args); + } + + @Override + protected void execute() { + while (!stopped_) { + try { + TTransport client = serverTransport_.accept(); + try { + getExecutorService().execute(new WorkerProcess(client)); + } catch (RejectedExecutionException ree) { + if (!stopped_) { + LOGGER.warn( + "ThreadPool is saturated with incoming requests. Closing latest connection."); + } + client.close(); + } + } catch (TTransportException ttx) { + if (!stopped_) { + LOGGER.warn("Transport error occurred during acceptance of message", ttx); + } + } + } + } + + // The following codes are copied from TThreadPoolServer.WorkerProcess + // and we add additional processing for connectionContext + + private class WorkerProcess implements Runnable { + + /** Client that this services. */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) { + client_ = client; + } + + /** Loops on processing a client forever */ + public void run() { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + + Optional<TServerEventHandler> eventHandler = Optional.empty(); + ServerContext connectionContext = null; + + try { + processor = processorFactory_.getProcessor(client_); + inputTransport = inputTransportFactory_.getTransport(client_); + outputTransport = outputTransportFactory_.getTransport(client_); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + + eventHandler = Optional.ofNullable(getEventHandler()); + + if (eventHandler.isPresent()) { + connectionContext = eventHandler.get().createContext(inputProtocol, outputProtocol); + LOGGER.error("测试一下效果。、。。。。"); + } + + while (true) { + if (Thread.currentThread().isInterrupted()) { + LOGGER.debug("WorkerProcess requested to shutdown"); + break; + } + if (eventHandler.isPresent()) { + eventHandler.get().processContext(connectionContext, inputTransport, outputTransport); + } + // This process cannot be interrupted by Interrupting the Thread. This + // will return once a message has been processed or the socket timeout + // has elapsed, at which point it will return and check the interrupt + // state of the thread. + processor.process(inputProtocol, outputProtocol); + } + } catch (Exception x) { + LOGGER.debug("Error processing request", x); + + // We'll usually receive RuntimeException types here + // Need to unwrap to ascertain real causing exception before we choose to ignore + // Ignore err-logging all transport-level/type exceptions + if (!isIgnorableException(x)) { + // Log the exception at error level and continue + LOGGER.error( + (x instanceof TException ? "Thrift " : "") + + "Error occurred during processing of message.", + x); + } + } finally { + if (eventHandler.isPresent()) { + eventHandler.get().deleteContext(connectionContext, inputProtocol, outputProtocol); + } + if (inputTransport != null) { + inputTransport.close(); + } + if (outputTransport != null) { + outputTransport.close(); + } + if (client_.isOpen()) { + client_.close(); + } + } + } + + private boolean isIgnorableException(Exception x) { + TTransportException tTransportException = null; + + if (x instanceof TTransportException) { + tTransportException = (TTransportException) x; + } else if (x.getCause() instanceof TTransportException) { + tTransportException = (TTransportException) x.getCause(); + } + + if (tTransportException != null) { + switch (tTransportException.getType()) { + case TTransportException.END_OF_FILE: + case TTransportException.TIMED_OUT: + return true; + } + } + return false; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java index 53cf755ad0..edf98d1774 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java @@ -154,7 +154,8 @@ public class ThriftServiceThread extends Thread { serverTransport = openTransport(bindAddress, port); TThreadPoolServer.Args poolArgs = initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond); - poolServer = new TThreadPoolServer(poolArgs); + poolServer = new TThreadPoolServerWithContext(poolArgs); + logger.warn("注册EventHandler"); poolServer.setServerEventHandler(serverEventHandler); } catch (TTransportException e) { catchFailedInitialization(e); diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java index 6761c0d1c9..e7e8c3ce4a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/handler/RPCServiceThriftHandler.java @@ -16,37 +16,92 @@ */ package org.apache.iotdb.db.service.thrift.handler; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.query.control.clientsession.ClientSession; import org.apache.iotdb.db.service.metrics.MetricService; import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl; +import org.apache.iotdb.external.api.thrift.JudgableServerContext; +import org.apache.iotdb.external.api.thrift.ServerContextFactory; +import org.apache.iotdb.rpc.TElasticFramedTransport; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServerEventHandler; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.net.Socket; import java.util.concurrent.atomic.AtomicLong; public class RPCServiceThriftHandler implements TServerEventHandler { + private static final Logger logger = LoggerFactory.getLogger(RPCServiceThriftHandler.class); private TSServiceImpl serviceImpl; private AtomicLong thriftConnectionNumber = new AtomicLong(0); + private ServerContextFactory factory = null; + public RPCServiceThriftHandler(TSServiceImpl serviceImpl) { this.serviceImpl = serviceImpl; MetricService.getInstance() .addMetricSet(new RPCServiceThriftHandlerMetrics(thriftConnectionNumber)); + String factoryClass = + IoTDBDescriptor.getInstance() + .getConfig() + .getCustomizedProperties() + .getProperty("rpc_service_thrift_handler_context_class"); + if (factoryClass != null) { + try { + factory = (ServerContextFactory) Class.forName(factoryClass).newInstance(); + } catch (Exception e) { + logger.warn( + "configuration announced ServerContextFactory {}, but it is not found in classpath", + factoryClass); + factory = null; + } + } } @Override - public ServerContext createContext(TProtocol arg0, TProtocol arg1) { + public ServerContext createContext(TProtocol in, TProtocol out) { + logger.info("创建了连接"); thriftConnectionNumber.incrementAndGet(); + Socket socket = + ((TSocket) ((TElasticFramedTransport) in.getTransport()).getSocket()).getSocket(); + logger.info( + "in local: {}:{}, remote: {}, default: {}", + socket.getLocalSocketAddress(), + socket.getLocalPort(), + socket.getRemoteSocketAddress(), + socket.getPort()); + socket = ((TSocket) ((TElasticFramedTransport) out.getTransport()).getSocket()).getSocket(); + logger.info( + "out local: {}:{}, remote: {}, default: {}", + socket.getLocalSocketAddress(), + socket.getLocalPort(), + socket.getRemoteSocketAddress(), + socket.getPort()); + getSessionManager() + .registerSession(new ClientSession((InetSocketAddress) socket.getRemoteSocketAddress())); + if (factory != null) { + JudgableServerContext context = factory.newServerContext(); + // TODO + return context; + } + return null; } @Override public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) { + logger.info("移除了连接"); // release query resources. serviceImpl.handleClientExit(); thriftConnectionNumber.decrementAndGet(); + getSessionManager().removeCurrSession(); } @Override @@ -58,4 +113,14 @@ public class RPCServiceThriftHandler implements TServerEventHandler { public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) { // nothing } + + /** + * get the SessionManager Instance. <br> + * in v0.13, Cluster mode uses different SessionManager instance... + * + * @return + */ + protected SessionManager getSessionManager() { + return SessionManager.getInstance(); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java index 7febf56420..e2467d0d77 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/InfluxDBServiceImpl.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.protocol.influxdb.meta.InfluxDBMetaManager; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; +import org.apache.iotdb.db.query.control.SessionManager; +import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.service.basic.BasicOpenSessionResp; import org.apache.iotdb.db.service.basic.ServiceProvider; @@ -59,6 +61,8 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { private final InfluxDBMetaManager metaManager; + private SessionManager sessionManager = SessionManager.getInstance(); + public InfluxDBServiceImpl() { serviceProvider = IoTDB.serviceProvider; metaManager = InfluxDBMetaManager.getInstance(); @@ -66,9 +70,14 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { @Override public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { + IClientSession session = sessionManager.getCurrSession(); BasicOpenSessionResp basicOpenSessionResp = - serviceProvider.openSession( - req.username, req.password, req.zoneId, TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); + serviceProvider.login( + session, + req.username, + req.password, + req.zoneId, + TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); return new TSOpenSessionResp() .setStatus( RpcUtils.getInfluxDBStatus( @@ -78,15 +87,17 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { @Override public TSStatus closeSession(TSCloseSessionReq req) { + IClientSession session = sessionManager.getCurrSession(); return new TSStatus( - !serviceProvider.closeSession(req.sessionId) + !serviceProvider.closeSession(session) ? RpcUtils.getInfluxDBStatus(TSStatusCode.NOT_LOGIN_ERROR) : RpcUtils.getInfluxDBStatus(TSStatusCode.SUCCESS_STATUS)); } @Override public TSStatus writePoints(TSWritePointsReq req) { - TSStatus loginStatus = checkLoginStatus(req.sessionId); + IClientSession session = sessionManager.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -97,7 +108,7 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { IoTDBPoint iotdbPoint = new IoTDBPoint(req.database, point, metaManager); try { InsertRowPlan plan = iotdbPoint.convertToInsertRowPlan(); - TSStatus tsStatus = executeNonQueryPlan(plan, req.sessionId); + TSStatus tsStatus = executeNonQueryPlan(session, plan, req.sessionId); if (executeCode == TSStatusCode.SUCCESS_STATUS.getStatusCode() && tsStatus.getCode() == TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) { executeCode = tsStatus.getCode(); @@ -118,11 +129,11 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { return tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode(); } - private TSStatus checkLoginStatus(long sessionId) { - if (!serviceProvider.checkLogin(sessionId)) { + private TSStatus checkLoginStatus(IClientSession session) { + if (!serviceProvider.checkLogin(session)) { return getNotLoggedInStatus(); } - if (serviceProvider.checkSessionTimeout(sessionId)) { + if (serviceProvider.checkSessionTimeout(session)) { return RpcUtils.getInfluxDBStatus( TSStatusCode.SESSION_TIMEOUT.getStatusCode(), "Session timeout"); } @@ -131,14 +142,15 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { @Override public TSStatus createDatabase(TSCreateDatabaseReq req) throws TException { - TSStatus loginStatus = checkLoginStatus(req.sessionId); + IClientSession session = sessionManager.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } try { SetStorageGroupPlan setStorageGroupPlan = new SetStorageGroupPlan(new PartialPath("root." + req.getDatabase())); - return executeNonQueryPlan(setStorageGroupPlan, req.getSessionId()); + return executeNonQueryPlan(session, setStorageGroupPlan, req.getSessionId()); } catch (IllegalPathException | QueryProcessException | StorageGroupNotSetException @@ -152,9 +164,9 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { } public void handleClientExit() { - Long sessionId = ServiceProvider.SESSION_MANAGER.getCurrSessionId(); - if (sessionId != null) { - closeSession(new TSCloseSessionReq(sessionId)); + IClientSession session = sessionManager.getCurrSession(); + if (session != null) { + closeSession(new TSCloseSessionReq(session.getId())); } } @@ -164,10 +176,10 @@ public class InfluxDBServiceImpl implements InfluxDBService.Iface { "Log in failed. Either you are not authorized or the session has timed out."); } - private TSStatus executeNonQueryPlan(PhysicalPlan plan, long sessionId) + private TSStatus executeNonQueryPlan(IClientSession session, PhysicalPlan plan, long sessionId) throws QueryProcessException, StorageGroupNotSetException, StorageEngineException { org.apache.iotdb.service.rpc.thrift.TSStatus status = - serviceProvider.checkAuthority(plan, sessionId); + serviceProvider.checkAuthority(plan, session); if (status == null) { status = serviceProvider.executeNonQuery(plan) diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java index 7e862e532f..5f0793562f 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java @@ -65,6 +65,7 @@ import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan; import org.apache.iotdb.db.qp.physical.sys.ShowQueryProcesslistPlan; import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan; import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.clientsession.IClientSession; import org.apache.iotdb.db.query.control.tracing.TracingConstant; import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet; import org.apache.iotdb.db.query.dataset.DirectNonAlignDataSet; @@ -178,7 +179,7 @@ public class TSServiceImpl implements TSIService.Iface { private PhysicalPlan plan; private final long queryStartTime; - private final long sessionId; + private final IClientSession session; private final String statement; private final long statementId; private final long timeout; @@ -194,7 +195,7 @@ public class TSServiceImpl implements TSIService.Iface { public QueryTask( PhysicalPlan plan, long queryStartTime, - long sessionId, + IClientSession session, String statement, long statementId, long timeout, @@ -203,7 +204,7 @@ public class TSServiceImpl implements TSIService.Iface { boolean enableRedirectQuery) { this.plan = plan; this.queryStartTime = queryStartTime; - this.sessionId = sessionId; + this.session = session; this.statement = statement; this.statementId = statementId; this.timeout = timeout; @@ -214,11 +215,11 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSExecuteStatementResp call() throws Exception { - String username = SESSION_MANAGER.getUsername(sessionId); + String username = session.getUsername(); plan.setLoginUserName(username); QUERY_FREQUENCY_RECORDER.incrementAndGet(); - AUDIT_LOGGER.debug("Session {} execute Query: {}", sessionId, statement); + AUDIT_LOGGER.debug("Session {} execute Query: {}", session, statement); final long queryId = SESSION_MANAGER.requestQueryId(statementId, true); QueryContext context = @@ -252,13 +253,13 @@ public class TSServiceImpl implements TSIService.Iface { protected class FetchResultsTask implements Callable<TSFetchResultsResp> { - private final long sessionId; + private final IClientSession session; private final long queryId; private final int fetchSize; private final boolean isAlign; - public FetchResultsTask(long sessionId, long queryId, int fetchSize, boolean isAlign) { - this.sessionId = sessionId; + public FetchResultsTask(IClientSession session, long queryId, int fetchSize, boolean isAlign) { + this.session = session; this.queryId = queryId; this.fetchSize = fetchSize; this.isAlign = isAlign; @@ -270,8 +271,7 @@ public class TSServiceImpl implements TSIService.Iface { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); try { if (isAlign) { - TSQueryDataSet result = - fillRpcReturnData(fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId)); + TSQueryDataSet result = fillRpcReturnData(fetchSize, queryDataSet, session.getUsername()); boolean hasResultSet = result.bufferForTime().limit() != 0; if (!hasResultSet) { SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId); @@ -281,8 +281,7 @@ public class TSServiceImpl implements TSIService.Iface { resp.setIsAlign(true); } else { TSQueryNonAlignDataSet nonAlignResult = - fillRpcNonAlignReturnData( - fetchSize, queryDataSet, SESSION_MANAGER.getUsername(sessionId)); + fillRpcNonAlignReturnData(fetchSize, queryDataSet, session.getUsername()); boolean hasResultSet = false; for (ByteBuffer timeBuffer : nonAlignResult.getTimeList()) { if (timeBuffer.limit() != 0) { @@ -323,8 +322,13 @@ public class TSServiceImpl implements TSIService.Iface { public TSOpenSessionResp openSession(TSOpenSessionReq req) throws TException { IoTDBConstant.ClientVersion clientVersion = parseClientVersion(req); BasicOpenSessionResp openSessionResp = - serviceProvider.openSession( - req.username, req.password, req.zoneId, req.client_protocol, clientVersion); + serviceProvider.login( + SESSION_MANAGER.getCurrSession(), + req.username, + req.password, + req.zoneId, + req.client_protocol, + clientVersion); TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage()); TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(openSessionResp.getSessionId()); @@ -341,7 +345,7 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( - !serviceProvider.closeSession(req.sessionId) + !serviceProvider.closeSession(SESSION_MANAGER.getCurrSession()) ? RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR) : RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); } @@ -355,13 +359,18 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus closeOperation(TSCloseOperationReq req) { return serviceProvider.closeOperation( - req.sessionId, req.queryId, req.statementId, req.isSetStatementId(), req.isSetQueryId()); + SESSION_MANAGER.getCurrSession(), + req.queryId, + req.statementId, + req.isSetStatementId(), + req.isSetQueryId()); } @Override public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); TSFetchMetadataResp resp = new TSFetchMetadataResp(); - TSStatus status = checkLoginStatus(req.getSessionId()); + TSStatus status = checkLoginStatus(session); if (isStatusNotSuccess(status)) { return resp.setStatus(status); } @@ -503,10 +512,11 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus executeBatchStatement(TSExecuteBatchStatementReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); long t1 = System.currentTimeMillis(); List<TSStatus> result = new ArrayList<>(); boolean isAllSuccessful = true; - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -522,10 +532,7 @@ public class TSServiceImpl implements TSIService.Iface { PhysicalPlan physicalPlan = serviceProvider .getPlanner() - .parseSQLToPhysicalPlan( - statement, - SESSION_MANAGER.getZoneId(req.sessionId), - SESSION_MANAGER.getClientVersion(req.sessionId)); + .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion()); if (physicalPlan.isQuery() || physicalPlan.isSelectInto()) { throw new QueryInBatchStatementException(statement); } @@ -539,7 +546,7 @@ public class TSServiceImpl implements TSIService.Iface { index = 0; } - TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(physicalPlan, session); if (status != null) { insertRowsPlan.getResults().put(index, status); isAllSuccessful = false; @@ -561,7 +568,7 @@ public class TSServiceImpl implements TSIService.Iface { multiPlan = new CreateMultiTimeSeriesPlan(); executeList.add(multiPlan); } - TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(physicalPlan, session); if (status != null) { multiPlan.getResults().put(i, status); isAllSuccessful = false; @@ -586,7 +593,7 @@ public class TSServiceImpl implements TSIService.Iface { executeList.clear(); } long t2 = System.currentTimeMillis(); - TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, req.getSessionId()); + TSExecuteStatementResp resp = executeNonQueryStatement(physicalPlan, session); addOperationLatency(Operation.EXECUTE_ONE_SQL_IN_BATCH, t2); result.add(resp.status); if (resp.getStatus().code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -611,32 +618,24 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); String statement = req.getStatement(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSExecuteStatementResp(loginStatus); } - long startTime = System.currentTimeMillis(); PhysicalPlan physicalPlan = serviceProvider .getPlanner() - .parseSQLToPhysicalPlan( - statement, - SESSION_MANAGER.getZoneId(req.getSessionId()), - SESSION_MANAGER.getClientVersion(req.sessionId)); + .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion()); if (physicalPlan.isQuery()) { - return submitQueryTask(physicalPlan, startTime, req); + return submitQueryTask(session, physicalPlan, startTime, req); } else { return executeUpdateStatement( - statement, - req.statementId, - physicalPlan, - req.fetchSize, - req.timeout, - req.getSessionId()); + session, statement, req.statementId, physicalPlan, req.fetchSize, req.timeout); } } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); @@ -651,8 +650,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSExecuteStatementResp(loginStatus); } @@ -661,13 +661,10 @@ public class TSServiceImpl implements TSIService.Iface { PhysicalPlan physicalPlan = serviceProvider .getPlanner() - .parseSQLToPhysicalPlan( - statement, - SESSION_MANAGER.getZoneId(req.sessionId), - SESSION_MANAGER.getClientVersion(req.sessionId)); + .parseSQLToPhysicalPlan(statement, session.getZoneId(), session.getClientVersion()); if (physicalPlan.isQuery()) { - return submitQueryTask(physicalPlan, startTime, req); + return submitQueryTask(session, physicalPlan, startTime, req); } else { return RpcUtils.getTSExecuteStatementResp( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement."); @@ -687,8 +684,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSExecuteStatementResp executeRawDataQuery(TSRawDataQueryReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSExecuteStatementResp(loginStatus); } @@ -696,10 +694,7 @@ public class TSServiceImpl implements TSIService.Iface { PhysicalPlan physicalPlan = serviceProvider .getPlanner() - .rawDataQueryReqToPhysicalPlan( - req, - SESSION_MANAGER.getZoneId(req.sessionId), - SESSION_MANAGER.getClientVersion(req.sessionId)); + .rawDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion()); if (physicalPlan.isQuery()) { Future<TSExecuteStatementResp> resp = @@ -708,7 +703,7 @@ public class TSServiceImpl implements TSIService.Iface { new QueryTask( physicalPlan, startTime, - req.sessionId, + session, "", req.statementId, CONFIG.getQueryTimeoutThreshold(), @@ -733,8 +728,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSExecuteStatementResp executeLastDataQuery(TSLastDataQueryReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSExecuteStatementResp(loginStatus); } @@ -742,10 +738,7 @@ public class TSServiceImpl implements TSIService.Iface { PhysicalPlan physicalPlan = serviceProvider .getPlanner() - .lastDataQueryReqToPhysicalPlan( - req, - SESSION_MANAGER.getZoneId(req.sessionId), - SESSION_MANAGER.getClientVersion(req.sessionId)); + .lastDataQueryReqToPhysicalPlan(req, session.getZoneId(), session.getClientVersion()); if (physicalPlan.isQuery()) { Future<TSExecuteStatementResp> resp = @@ -754,7 +747,7 @@ public class TSServiceImpl implements TSIService.Iface { new QueryTask( physicalPlan, startTime, - req.sessionId, + session, "", req.statementId, CONFIG.getQueryTimeoutThreshold(), @@ -778,8 +771,9 @@ public class TSServiceImpl implements TSIService.Iface { } private TSExecuteStatementResp submitQueryTask( - PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) throws Exception { - TSStatus status = serviceProvider.checkAuthority(physicalPlan, req.getSessionId()); + IClientSession session, PhysicalPlan physicalPlan, long startTime, TSExecuteStatementReq req) + throws Exception { + TSStatus status = serviceProvider.checkAuthority(physicalPlan, session); if (status != null) { return new TSExecuteStatementResp(status); } @@ -788,7 +782,7 @@ public class TSServiceImpl implements TSIService.Iface { new QueryTask( physicalPlan, startTime, - req.sessionId, + session, req.statement, req.statementId, req.timeout, @@ -929,15 +923,15 @@ public class TSServiceImpl implements TSIService.Iface { } private TSExecuteStatementResp executeSelectIntoStatement( + IClientSession session, String statement, long statementId, PhysicalPlan physicalPlan, int fetchSize, - long timeout, - long sessionId) + long timeout) throws IoTDBException, TException, SQLException, IOException, InterruptedException, QueryFilterOptimizationException { - TSStatus status = serviceProvider.checkAuthority(physicalPlan, sessionId); + TSStatus status = serviceProvider.checkAuthority(physicalPlan, session); if (status != null) { return new TSExecuteStatementResp(status); } @@ -951,8 +945,7 @@ public class TSServiceImpl implements TSIService.Iface { final QueryPlan queryPlan = selectIntoPlan.getQueryPlan(); QUERY_FREQUENCY_RECORDER.incrementAndGet(); - AUDIT_LOGGER.debug( - "Session {} execute select into: {}", SESSION_MANAGER.getCurrSessionId(), statement); + AUDIT_LOGGER.debug("Session {} execute select into: {}", session, statement); if (queryPlan.isEnableTracing()) { TRACING_MANAGER.setSeriesPathNum(queryId, queryPlan.getPaths().size()); } @@ -970,7 +963,7 @@ public class TSServiceImpl implements TSIService.Iface { if (insertTabletPlans.isEmpty()) { continue; } - TSStatus executionStatus = insertTabletsInternally(insertTabletPlans, sessionId); + TSStatus executionStatus = insertTabletsInternally(session, insertTabletPlans); if (isStatusNotSuccess(executionStatus) && executionStatus.getCode() != TSStatusCode.NEED_REDIRECTION.getStatusCode()) { return RpcUtils.getTSExecuteStatementResp(executionStatus).setQueryId(queryId); @@ -989,11 +982,11 @@ public class TSServiceImpl implements TSIService.Iface { } private TSStatus insertTabletsInternally( - List<InsertTabletPlan> insertTabletPlans, long sessionId) { + IClientSession session, List<InsertTabletPlan> insertTabletPlans) { InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(); for (int i = 0; i < insertTabletPlans.size(); i++) { InsertTabletPlan insertTabletPlan = insertTabletPlans.get(i); - TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, sessionId); + TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session); if (status != null) { // not authorized @@ -1009,7 +1002,8 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSFetchResultsResp(loginStatus); } @@ -1020,7 +1014,7 @@ public class TSServiceImpl implements TSIService.Iface { Future<TSFetchResultsResp> resp = QueryTaskManager.getInstance() - .submit(new FetchResultsTask(req.sessionId, req.queryId, req.fetchSize, req.isAlign)); + .submit(new FetchResultsTask(session, req.queryId, req.fetchSize, req.isAlign)); return resp.get(); } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); @@ -1075,7 +1069,8 @@ public class TSServiceImpl implements TSIService.Iface { /** update statement can be: 1. select-into statement 2. non-query statement */ @Override public TSExecuteStatementResp executeUpdateStatement(TSExecuteStatementReq req) { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return RpcUtils.getTSExecuteStatementResp(loginStatus); } @@ -1084,19 +1079,12 @@ public class TSServiceImpl implements TSIService.Iface { serviceProvider .getPlanner() .parseSQLToPhysicalPlan( - req.statement, - SESSION_MANAGER.getZoneId(req.sessionId), - SESSION_MANAGER.getClientVersion(req.sessionId)); + req.statement, session.getZoneId(), session.getClientVersion()); return physicalPlan.isQuery() ? RpcUtils.getTSExecuteStatementResp( TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is a query statement.") : executeUpdateStatement( - req.statement, - req.statementId, - physicalPlan, - req.fetchSize, - req.timeout, - req.getSessionId()); + session, req.statement, req.statementId, physicalPlan, req.fetchSize, req.timeout); } catch (InterruptedException e) { LOGGER.error(INFO_INTERRUPT_ERROR, req, e); Thread.currentThread().interrupt(); @@ -1112,21 +1100,22 @@ public class TSServiceImpl implements TSIService.Iface { /** update statement can be: 1. select-into statement 2. non-query statement */ private TSExecuteStatementResp executeUpdateStatement( + IClientSession session, String statement, long statementId, PhysicalPlan plan, int fetchSize, - long timeout, - long sessionId) + long timeout) throws TException, SQLException, IoTDBException, IOException, InterruptedException, QueryFilterOptimizationException { return plan.isSelectInto() - ? executeSelectIntoStatement(statement, statementId, plan, fetchSize, timeout, sessionId) - : executeNonQueryStatement(plan, sessionId); + ? executeSelectIntoStatement(session, statement, statementId, plan, fetchSize, timeout) + : executeNonQueryStatement(plan, session); } - private TSExecuteStatementResp executeNonQueryStatement(PhysicalPlan plan, long sessionId) { - TSStatus status = serviceProvider.checkAuthority(plan, sessionId); + private TSExecuteStatementResp executeNonQueryStatement( + PhysicalPlan plan, IClientSession session) { + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? new TSExecuteStatementResp(status) : RpcUtils.getTSExecuteStatementResp(executeNonQueryPlan(plan)) @@ -1134,9 +1123,9 @@ public class TSServiceImpl implements TSIService.Iface { } public void handleClientExit() { - Long sessionId = SESSION_MANAGER.getCurrSessionId(); - if (sessionId != null) { - TSCloseSessionReq req = new TSCloseSessionReq(sessionId); + IClientSession session = SESSION_MANAGER.getCurrSession(); + if (session != null) { + TSCloseSessionReq req = new TSCloseSessionReq(); closeSession(req); } } @@ -1151,7 +1140,7 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSGetTimeZoneResp getTimeZone(long sessionId) { try { - ZoneId zoneId = SESSION_MANAGER.getZoneId(sessionId); + ZoneId zoneId = SESSION_MANAGER.getCurrSession().getZoneId(); return new TSGetTimeZoneResp( RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), zoneId != null ? zoneId.toString() : "Unknown time zone"); @@ -1166,7 +1155,7 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus setTimeZone(TSSetTimeZoneReq req) { try { - SESSION_MANAGER.setTimezone(req.sessionId, req.timeZone); + SESSION_MANAGER.getCurrSession().setZoneId(ZoneId.of(req.timeZone)); return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); } catch (Exception e) { return onNPEOrUnexpectedException( @@ -1202,14 +1191,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertRecords(TSInsertRecordsReq req) { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session {} insertRecords, first device {}, first time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.prefixPaths.get(0), req.getTimestamps().get(0)); } @@ -1224,7 +1214,7 @@ public class TSServiceImpl implements TSIService.Iface { req.getMeasurementsList().get(i).toArray(new String[0]), req.valuesList.get(i), req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { insertRowsPlan.getResults().put(i, status); allCheckSuccess = false; @@ -1254,14 +1244,14 @@ public class TSServiceImpl implements TSIService.Iface { /** * Checking the Login Status. * - * @param sessionId Session id. + * @param session client session. * @return When not login or session timeout, will return error status. */ - private TSStatus checkLoginStatus(long sessionId) { - if (!serviceProvider.checkLogin(sessionId)) { + private TSStatus checkLoginStatus(IClientSession session) { + if (!serviceProvider.checkLogin(session)) { return getNotLoggedInStatus(); } - if (serviceProvider.checkSessionTimeout(sessionId)) { + if (serviceProvider.checkSessionTimeout(session)) { return getSessionTimeoutStatus(); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS); @@ -1290,14 +1280,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq req) { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session {} insertRecords, device {}, first time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.prefixPath, req.getTimestamps().get(0)); } @@ -1311,7 +1302,7 @@ public class TSServiceImpl implements TSIService.Iface { req.getMeasurementsList(), req.getValuesList(), req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); statusList.add(status != null ? status : executeNonQueryPlan(plan)); } catch (IoTDBException e) { statusList.add( @@ -1336,14 +1327,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq req) { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session {} insertRecords, device {}, first time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.prefixPath, req.getTimestamps().get(0)); } @@ -1359,7 +1351,7 @@ public class TSServiceImpl implements TSIService.Iface { plan.setDataTypes(new TSDataType[plan.getMeasurements().length]); plan.setNeedInferType(true); plan.setAligned(req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { insertRowsPlan.getResults().put(i, status); @@ -1394,14 +1386,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertStringRecords(TSInsertStringRecordsReq req) { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session {} insertRecords, first device {}, first time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.prefixPaths.get(0), req.getTimestamps().get(0)); } @@ -1417,7 +1410,7 @@ public class TSServiceImpl implements TSIService.Iface { plan.setDataTypes(new TSDataType[plan.getMeasurements().length]); plan.setNeedInferType(true); plan.setAligned(req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { insertRowsPlan.getResults().put(i, status); @@ -1507,14 +1500,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertRecord(TSInsertRecordReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } AUDIT_LOGGER.debug( "Session {} insertRecord, device {}, time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getPrefixPath(), req.getTimestamp()); @@ -1525,7 +1519,7 @@ public class TSServiceImpl implements TSIService.Iface { req.getMeasurements().toArray(new String[0]), req.values, req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { return status; @@ -1542,14 +1536,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertStringRecord(TSInsertStringRecordReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } AUDIT_LOGGER.debug( "Session {} insertRecord, device {}, time {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getPrefixPath(), req.getTimestamp()); @@ -1561,7 +1556,7 @@ public class TSServiceImpl implements TSIService.Iface { plan.setValues(req.getValues().toArray(new Object[0])); plan.setNeedInferType(true); plan.setAligned(req.isAligned); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { return status; @@ -1578,8 +1573,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus deleteData(TSDeleteDataReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -1591,7 +1587,7 @@ public class TSServiceImpl implements TSIService.Iface { paths.add(new PartialPath(path)); } plan.addPaths(paths); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? new TSStatus(status) : new TSStatus(executeNonQueryPlan(plan)); } catch (IoTDBException e) { @@ -1604,9 +1600,10 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertTablet(TSInsertTabletReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); long t1 = System.currentTimeMillis(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -1621,7 +1618,7 @@ public class TSServiceImpl implements TSIService.Iface { insertTabletPlan.setRowCount(req.size); insertTabletPlan.setDataTypes(req.types); insertTabletPlan.setAligned(req.isAligned); - TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session); if (status != null) { return status; @@ -1640,13 +1637,14 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus insertTablets(TSInsertTabletsReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); long t1 = System.currentTimeMillis(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } - return insertTabletsInternally(req); + return insertTabletsInternally(session, req); } catch (IoTDBException e) { return onIoTDBException(e, OperationType.INSERT_TABLETS, e.getErrorCode()); } catch (NullPointerException e) { @@ -1662,6 +1660,7 @@ public class TSServiceImpl implements TSIService.Iface { private InsertTabletPlan constructInsertTabletPlan(TSInsertTabletsReq req, int i) throws IllegalPathException { + IClientSession session = SESSION_MANAGER.getCurrSession(); InsertTabletPlan insertTabletPlan = new InsertTabletPlan(new PartialPath(req.prefixPaths.get(i)), req.measurementsList.get(i)); insertTabletPlan.setTimes( @@ -1682,12 +1681,13 @@ public class TSServiceImpl implements TSIService.Iface { } /** construct one InsertMultiTabletPlan and process it */ - public TSStatus insertTabletsInternally(TSInsertTabletsReq req) throws IllegalPathException { + private TSStatus insertTabletsInternally(IClientSession session, TSInsertTabletsReq req) + throws IllegalPathException { List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>(); InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(); for (int i = 0; i < req.prefixPaths.size(); i++) { InsertTabletPlan insertTabletPlan = constructInsertTabletPlan(req, i); - TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, session); if (status != null) { // not authorized insertMultiTabletPlan.getResults().put(i, status); @@ -1701,14 +1701,15 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus setStorageGroup(long sessionId, String storageGroup) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(sessionId); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } SetStorageGroupPlan plan = new SetStorageGroupPlan(new PartialPath(storageGroup)); - TSStatus status = serviceProvider.checkAuthority(plan, sessionId); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IoTDBException e) { @@ -1721,8 +1722,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus deleteStorageGroups(long sessionId, List<String> storageGroups) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(sessionId); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -1732,7 +1734,7 @@ public class TSServiceImpl implements TSIService.Iface { storageGroupList.add(new PartialPath(storageGroup)); } DeleteStorageGroupPlan plan = new DeleteStorageGroupPlan(storageGroupList); - TSStatus status = serviceProvider.checkAuthority(plan, sessionId); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_STORAGE_GROUPS, e.getErrorCode()); @@ -1744,14 +1746,14 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus createTimeseries(TSCreateTimeseriesReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { - AUDIT_LOGGER.debug( - "Session-{} create timeseries {}", SESSION_MANAGER.getCurrSessionId(), req.getPath()); + AUDIT_LOGGER.debug("Session-{} create timeseries {}", session, req.getPath()); } CreateTimeSeriesPlan plan = @@ -1764,7 +1766,7 @@ public class TSServiceImpl implements TSIService.Iface { req.tags, req.attributes, req.measurementAlias); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_TIMESERIES, e.getErrorCode()); @@ -1776,8 +1778,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus createAlignedTimeseries(TSCreateAlignedTimeseriesReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -1785,7 +1788,7 @@ public class TSServiceImpl implements TSIService.Iface { if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session-{} create aligned timeseries {}.{}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getPrefixPath(), req.getMeasurements()); } @@ -1811,7 +1814,7 @@ public class TSServiceImpl implements TSIService.Iface { encodings, compressors, req.measurementAlias); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IoTDBException e) { return onIoTDBException(e, OperationType.CREATE_ALIGNED_TIMESERIES, e.getErrorCode()); @@ -1824,15 +1827,16 @@ public class TSServiceImpl implements TSIService.Iface { @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public TSStatus createMultiTimeseries(TSCreateMultiTimeseriesReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session-{} create {} timeseries, the first is {}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getPaths().size(), req.getPaths().get(0)); } @@ -1863,7 +1867,7 @@ public class TSServiceImpl implements TSIService.Iface { CreateTimeSeriesPlan plan = new CreateTimeSeriesPlan(); for (int i = 0; i < req.paths.size(); i++) { plan.setPath(new PartialPath(req.paths.get(i))); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); if (status != null) { // not authorized multiPlan.getResults().put(i, status); @@ -1911,8 +1915,9 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus deleteTimeseries(long sessionId, List<String> paths) { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(sessionId); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } @@ -1922,7 +1927,7 @@ public class TSServiceImpl implements TSIService.Iface { pathList.add(new PartialPath(path)); } DeleteTimeSeriesPlan plan = new DeleteTimeSeriesPlan(pathList); - TSStatus status = serviceProvider.checkAuthority(plan, sessionId); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IoTDBException e) { return onIoTDBException(e, OperationType.DELETE_TIMESERIES, e.getErrorCode()); @@ -1934,28 +1939,27 @@ public class TSServiceImpl implements TSIService.Iface { @Override public long requestStatementId(long sessionId) { - return SESSION_MANAGER.requestStatementId(sessionId); + IClientSession session = SESSION_MANAGER.getCurrSession(); + return SESSION_MANAGER.requestStatementId(session); } @Override public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException { + IClientSession session = SESSION_MANAGER.getCurrSession(); try { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { - AUDIT_LOGGER.debug( - "Session-{} create schema template {}", - SESSION_MANAGER.getCurrSessionId(), - req.getName()); + AUDIT_LOGGER.debug("Session-{} create schema template {}", session, req.getName()); } CreateTemplatePlan plan; // Construct plan from serialized request ByteBuffer buffer = ByteBuffer.wrap(req.getSerializedTemplate()); plan = CreateTemplatePlan.deserializeFromReq(buffer); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (Exception e) { @@ -1966,6 +1970,7 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); int size = req.getMeasurementsSize(); String[] measurements = new String[size]; TSDataType[] dataTypes = new TSDataType[size]; @@ -1982,20 +1987,22 @@ public class TSServiceImpl implements TSIService.Iface { AppendTemplatePlan plan = new AppendTemplatePlan( req.getName(), req.isAligned, measurements, dataTypes, encodings, compressionTypes); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } @Override public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); PruneTemplatePlan plan = new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath())); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } @Override public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); TSQueryTemplateResp resp = new TSQueryTemplateResp(); try { String path; @@ -2044,21 +2051,22 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session-{} set device template {}.{}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getTemplateName(), req.getPrefixPath()); } try { SetTemplatePlan plan = new SetTemplatePlan(req.templateName, req.prefixPath); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); @@ -2067,21 +2075,22 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus unsetSchemaTemplate(TSUnsetSchemaTemplateReq req) throws TException { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( "Session-{} unset schema template {}.{}", - SESSION_MANAGER.getCurrSessionId(), + session, req.getPrefixPath(), req.getTemplateName()); } try { UnsetTemplatePlan plan = new UnsetTemplatePlan(req.prefixPath, req.templateName); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); @@ -2091,23 +2100,21 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus unsetUsingTemplate(long sessionId, String templateName, String prefixPath) throws TException { - TSStatus loginStatus = checkLoginStatus(sessionId); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( - "Session-{} unset using schema template {} on {}", - SESSION_MANAGER.getCurrSessionId(), - templateName, - prefixPath); + "Session-{} unset using schema template {} on {}", session, templateName, prefixPath); } try { DeactivateTemplatePlan plan = new DeactivateTemplatePlan(templateName, new PartialPath(prefixPath)); - TSStatus status = serviceProvider.checkAuthority(plan, sessionId); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (MetadataException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); @@ -2116,20 +2123,19 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus setUsingTemplate(TSSetUsingTemplateReq req) throws TException { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { AUDIT_LOGGER.debug( - "Session-{} create timeseries of schema template on path {}", - SESSION_MANAGER.getCurrSessionId(), - req.getDstPath()); + "Session-{} create timeseries of schema template on path {}", session, req.getDstPath()); } try { ActivateTemplatePlan plan = new ActivateTemplatePlan(new PartialPath(req.getDstPath())); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } catch (IllegalPathException e) { return onIoTDBException(e, OperationType.EXECUTE_STATEMENT, e.getErrorCode()); @@ -2138,19 +2144,17 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus dropSchemaTemplate(TSDropSchemaTemplateReq req) throws TException { - TSStatus loginStatus = checkLoginStatus(req.getSessionId()); + IClientSession session = SESSION_MANAGER.getCurrSession(); + TSStatus loginStatus = checkLoginStatus(session); if (isStatusNotSuccess(loginStatus)) { return loginStatus; } if (AUDIT_LOGGER.isDebugEnabled()) { - AUDIT_LOGGER.debug( - "Session-{} drop schema template {}.", - SESSION_MANAGER.getCurrSessionId(), - req.getTemplateName()); + AUDIT_LOGGER.debug("Session-{} drop schema template {}.", session, req.getTemplateName()); } DropTemplatePlan plan = new DropTemplatePlan(req.templateName); - TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId()); + TSStatus status = serviceProvider.checkAuthority(plan, session); return status != null ? status : executeNonQueryPlan(plan); } @@ -2164,6 +2168,7 @@ public class TSServiceImpl implements TSIService.Iface { @Override public TSStatus executeOperationSync(TSOperationSyncWriteReq req) { + IClientSession session = SESSION_MANAGER.getCurrSession(); PhysicalPlan physicalPlan; try { ByteBuffer planBuffer = req.physicalPlan;
