This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/useXX in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2fd763a3020a95106947b3c774ac09abada370d7 Author: JackieTien97 <[email protected]> AuthorDate: Mon Jul 8 20:27:49 2024 +0800 Add SessionWrapper --- .../iotdb/itbase/runtime/ClusterTestStatement.java | 4 +- .../org/apache/iotdb/isession/IPooledSession.java | 64 + .../org/apache/iotdb/jdbc/IoTDBConnection.java | 5 +- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 4 + .../src/main/java/org/apache/iotdb/jdbc/Utils.java | 51 +- .../main/java/org/apache/iotdb/rpc/RpcUtils.java | 4 + .../java/org/apache/iotdb/session/Session.java | 27 +- .../apache/iotdb/session/SessionConnection.java | 8 +- .../org/apache/iotdb/session/pool/SessionPool.java | 17 +- .../apache/iotdb/session/pool/SessionWrapper.java | 1504 ++++++++++++++++++++ .../session/subscription/SubscriptionSession.java | 9 +- .../SubscriptionSessionConnection.java | 10 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 21 + .../thrift-datanode/src/main/thrift/client.thrift | 2 + 14 files changed, 1700 insertions(+), 30 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java index f8c3e38ea57..d99ea24717e 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java @@ -31,6 +31,8 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase; + /** The implementation of {@link ClusterTestStatement} in cluster test. */ public class ClusterTestStatement implements Statement { @@ -190,7 +192,7 @@ public class ClusterTestStatement implements Statement { sql = sql.trim(); boolean result = writeStatement.execute(sql); // if use XXXX, sendRequest to all statements - if (sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4))) { + if (isUseDatabase(sql)) { for (Statement readStatement : readStatements) { boolean tmp = readStatement.execute(sql); result = result && tmp; diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java new file mode 100644 index 00000000000..04c25944a97 --- /dev/null +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.isession; + +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp; +import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp; + +import org.apache.thrift.TException; +import org.apache.tsfile.write.record.Tablet; + +public interface IPooledSession extends AutoCloseable { + + Version getVersion(); + + int getFetchSize(); + + void close() throws IoTDBConnectionException; + + String getTimeZone(); + + SessionDataSet executeQueryStatement(String sql) + throws StatementExecutionException, IoTDBConnectionException; + + SessionDataSet executeQueryStatement(String sql, long timeoutInMs) + throws StatementExecutionException, IoTDBConnectionException; + + void executeNonQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException; + + String getTimestampPrecision() throws TException; + + void insertTablet(Tablet tablet) throws StatementExecutionException, IoTDBConnectionException; + + boolean isEnableQueryRedirection(); + + boolean isEnableRedirection(); + + TSBackupConfigurationResp getBackupConfiguration() + throws IoTDBConnectionException, StatementExecutionException; + + TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException; + + long getQueryTimeout(); +} diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java index f62ed9799fc..721a1217ca2 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java @@ -62,7 +62,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.Executor; -import javax.sql.DataSource; public class IoTDBConnection implements Connection { @@ -595,4 +594,8 @@ public class IoTDBConnection implements Connection { public ServerProperties getServerProperties() throws TException { return getClient().getProperties(); } + + protected void changeDefaultDatabase(String database) { + params.setDb(database); + } } diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index eaa080aaefe..4dd1d505587 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -321,6 +321,10 @@ public class IoTDBStatement implements Statement { throw new IoTDBSQLException(e.getMessage(), execResp.getStatus()); } + if (execResp.isSetDatabase()) { + connection.changeDefaultDatabase(execResp.getDatabase()); + } + if (execResp.isSetColumns()) { queryId = execResp.getQueryId(); if (execResp.queryResult == null) { diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java index 41283e24e42..7f90df75145 100644 --- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java +++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java @@ -33,10 +33,10 @@ public class Utils { "squid:S5843", "squid:S5998" }) // Regular expressions should not be too complicated - static final Pattern SUFFIX_URL_PATTERN = Pattern.compile("([0-9]{1,5})(/|\\\\?.*=.*(&.*=.*)*)?"); + static final Pattern SUFFIX_URL_PATTERN = Pattern.compile("(/|\\\\?.*=.*(&.*=.*)*)?"); static final String COLON = ":"; - static final String SLASH = "/"; + static final char SLASH = '/'; static final String PARAMETER_SEPARATOR = "?"; static final String RPC_COMPRESS = "rpc_compress"; @@ -59,29 +59,42 @@ public class Utils { String subURL = url.substring(Config.IOTDB_URL_PREFIX.length()); int i = subURL.lastIndexOf(COLON); host = subURL.substring(0, i); + params.setHost(host); suffixURL = subURL.substring(i + 1); - matcher = SUFFIX_URL_PATTERN.matcher(suffixURL); - if (matcher.matches() && parseUrlParam(subURL, info)) { - isUrlLegal = true; + // parse port + int port = 0; + for (; i < subURL.length() && Character.isDigit(subURL.charAt(i)); i++) { + port = port * 10 + (subURL.charAt(i) - '0'); + } + // legal port + if (port >= 1 && port <= 65535) { + params.setPort(port); + // parse database + if (subURL.charAt(i) == SLASH) { + int endIndex = subURL.indexOf(PARAMETER_SEPARATOR, i + 1); + String database; + if (endIndex <= i + 1) { + database = subURL.substring(i + 1); + suffixURL = ""; + } else { + database = subURL.substring(i + 1, endIndex); + suffixURL = subURL.substring(endIndex); + } + params.setDb(database); + } + + matcher = SUFFIX_URL_PATTERN.matcher(suffixURL); + if (matcher.matches() && parseUrlParam(subURL, info)) { + isUrlLegal = true; + } } } if (!isUrlLegal) { throw new IoTDBURLException( - "Error url format, url should be jdbc:iotdb://anything:port/ or jdbc:iotdb://anything:port?property1=value1&property2=value2"); + "Error url format, url should be jdbc:iotdb://anything:port/[database] or jdbc:iotdb://anything:port[/database]?property1=value1&property2=value2"); } - params.setHost(host); - - // parse port - String port = suffixURL; - if (suffixURL.contains(PARAMETER_SEPARATOR)) { - port = suffixURL.split("\\" + PARAMETER_SEPARATOR)[0]; - } else if (suffixURL.contains(SLASH)) { - port = suffixURL.substring(0, suffixURL.length() - 1); - } - params.setPort(Integer.parseInt(port)); - if (info.containsKey(Config.AUTH_USER)) { params.setUsername(info.getProperty(Config.AUTH_USER)); } @@ -120,9 +133,6 @@ public class Utils { if (info.containsKey(Config.SQL_DIALECT)) { params.setSqlDialect(info.getProperty(Config.SQL_DIALECT)); } - if (info.containsKey(Config.DATABASE)) { - params.setDb(info.getProperty(Config.DATABASE)); - } return params; } @@ -162,7 +172,6 @@ public class Utils { case Config.VERSION: case Config.NETWORK_TIMEOUT: case Config.SQL_DIALECT: - case Config.DATABASE: info.put(key, value); break; case Config.TIME_ZONE: diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java index e48054abb19..06aac75aa66 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java @@ -342,4 +342,8 @@ public class RpcUtils { : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()) .setMessage(failedStatus.toString()); } + + public static boolean isUseDatabase(String sql) { + return sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4)); + } } diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 18a9142335c..5e0d205a3b5 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -89,6 +89,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -176,7 +177,8 @@ public class Session implements ISession { protected String sqlDialect = SessionConfig.SQL_DIALECT; - protected String database; + // may be null + protected volatile String database; private static final String REDIRECT_TWICE = "redirect twice"; @@ -606,7 +608,14 @@ public class Session implements ISession { session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect, database); } return new SessionConnection( - session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect, database); + session, + endpoint, + zoneId, + availableNodes, + maxRetryCount, + retryIntervalInMs, + sqlDialect, + database); } @Override @@ -939,7 +948,13 @@ public class Session implements ISession { @Override public void executeNonQueryStatement(String sql) throws IoTDBConnectionException, StatementExecutionException { + String previousDB = database; defaultSessionConnection.executeNonQueryStatement(sql); + if (!Objects.equals(previousDB, database) && endPointToSessionConnection != null) { + for (SessionConnection sessionConnection : endPointToSessionConnection.values()) { + sessionConnection.executeNonQueryStatement(sql); + } + } } /** @@ -3829,6 +3844,14 @@ public class Session implements ISession { return defaultSessionConnection.fetchAllConnections(); } + protected void changeDatabase(String database) { + this.database = database; + } + + public String getDatabase() { + return database; + } + public static class Builder { private String host = SessionConfig.DEFAULT_HOST; private int rpcPort = SessionConfig.DEFAULT_PORT; diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index c6682ed8f2a..37f9f72f39d 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -206,7 +206,7 @@ public class SessionConnection { openReq.putToConfiguration("version", session.version.toString()); openReq.putToConfiguration("sql_dialect", sqlDialect); if (database != null) { - openReq.putToConfiguration("database", database); + openReq.putToConfiguration("db", database); } try { @@ -511,7 +511,11 @@ public class SessionConnection { throws TException { request.setSessionId(sessionId); request.setStatementId(statementId); - return client.executeUpdateStatementV2(request).status; + TSExecuteStatementResp resp = client.executeUpdateStatementV2(request); + if (resp.isSetDatabase()) { + session.changeDatabase(resp.getDatabase()); + } + return resp.status; } protected SessionDataSet executeRawDataQuery( diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 8eeeb950f51..6979c314787 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -57,6 +57,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase; + /** * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider * how to reuse a session connection. Even if the session is disconnected, the session pool can @@ -155,6 +157,7 @@ public class SessionPool implements ISessionPool { protected String sqlDialect = SessionConfig.SQL_DIALECT; + // may be null protected String database; private static final String INSERT_RECORD_FAIL = "insertRecord failed"; @@ -720,7 +723,7 @@ public class SessionPool implements ISessionPool { } @SuppressWarnings({"squid:S2446"}) - private void putBack(ISession session) { + protected void putBack(ISession session) { queue.push(session); synchronized (this) { // we do not need to notifyAll as any waited thread can continue to work after waked up. @@ -834,6 +837,11 @@ public class SessionPool implements ISessionPool { } } + protected void cleanSessionAndMayThrowConnectionException(ISession session) { + closeSession(session); + tryConstructNewSession(); + } + /** * insert the data of a device. For each timestamp, the number of measurements is the same. * @@ -3036,6 +3044,13 @@ public class SessionPool implements ISessionPool { @Override public void executeNonQueryStatement(String sql) throws StatementExecutionException, IoTDBConnectionException { + + // use XXX is forbidden in SessionPool.executeNonQueryStatement + if (isUseDatabase(sql)) { + throw new IllegalArgumentException( + String.format("SessionPool doesn't support executing %s directly", sql)); + } + ISession session = getSession(); try { session.executeNonQueryStatement(sql); diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java new file mode 100644 index 00000000000..1214859692a --- /dev/null +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java @@ -0,0 +1,1504 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.session.pool; + +import org.apache.iotdb.common.rpc.thrift.TAggregationType; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.isession.INodeSupplier; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.isession.template.Template; +import org.apache.iotdb.isession.util.Version; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp; +import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp; +import org.apache.iotdb.session.Session; + +import org.apache.thrift.TException; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * used for SessionPool.getSession need to do some other things like calling + * cleanSessionAndMayThrowConnectionException in SessionPool while encountering connection exception + * only need to putBack to SessionPool while closing + */ +public class SessionWrapper implements ISession { + + private static final Logger LOGGER = LoggerFactory.getLogger(SessionWrapper.class); + + private Session session; + + private final SessionPool sessionPool; + + private final AtomicBoolean closed; + + public SessionWrapper(Session session, SessionPool sessionPool) { + this.session = session; + this.sessionPool = sessionPool; + this.closed = new AtomicBoolean(false); + } + + @Override + public Version getVersion() { + return session.getVersion(); + } + + @Override + public void setVersion(Version version) { + session.setVersion(version); + } + + @Override + public int getFetchSize() { + return session.getFetchSize(); + } + + @Override + public void setFetchSize(int fetchSize) { + session.setFetchSize(fetchSize); + } + + @Override + public void open() throws IoTDBConnectionException { + throw new UnsupportedOperationException(); + } + + @Override + public void open(boolean enableRPCCompression) throws IoTDBConnectionException { + throw new UnsupportedOperationException(); + } + + @Override + public void open(boolean enableRPCCompression, int connectionTimeoutInMs) + throws IoTDBConnectionException { + throw new UnsupportedOperationException(); + } + + @Override + public void open( + boolean enableRPCCompression, + int connectionTimeoutInMs, + Map<String, TEndPoint> deviceIdToEndpoint, + INodeSupplier nodeSupplier) + throws IoTDBConnectionException { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IoTDBConnectionException { + if (closed.compareAndSet(false, true)) { + if (!Objects.equals(session.getDatabase(), sessionPool.database) + && sessionPool.database != null) { + try { + session.executeNonQueryStatement("use " + sessionPool.database); + } catch (StatementExecutionException e) { + LOGGER.warn( + "Failed to change back database by executing: use " + sessionPool.database, e); + } + } + sessionPool.putBack(session); + session = null; + } + } + + @Override + public String getTimeZone() { + return session.getTimeZone(); + } + + @Override + public void setTimeZone(String zoneId) + throws StatementExecutionException, IoTDBConnectionException { + try { + session.setTimeZone(zoneId); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void setTimeZoneOfSession(String zoneId) { + session.setTimeZoneOfSession(zoneId); + } + + @Override + public void setStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.setStorageGroup(storageGroup); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteStorageGroup(storageGroup); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteStorageGroups(List<String> storageGroups) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteStorageGroups(storageGroups); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createDatabase(String database) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createDatabase(database); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteDatabase(String database) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteDatabase(database); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteDatabases(List<String> databases) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteDatabases(databases); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createTimeseries( + String path, TSDataType dataType, TSEncoding encoding, CompressionType compressor) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createTimeseries(path, dataType, encoding, compressor); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createTimeseries( + String path, + TSDataType dataType, + TSEncoding encoding, + CompressionType compressor, + Map<String, String> props, + Map<String, String> tags, + Map<String, String> attributes, + String measurementAlias) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createTimeseries( + path, dataType, encoding, compressor, props, tags, attributes, measurementAlias); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createAlignedTimeseries( + String deviceId, + List<String> measurements, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + List<String> measurementAliasList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createAlignedTimeseries( + deviceId, measurements, dataTypes, encodings, compressors, measurementAliasList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createAlignedTimeseries( + String deviceId, + List<String> measurements, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + List<String> measurementAliasList, + List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createAlignedTimeseries( + deviceId, + measurements, + dataTypes, + encodings, + compressors, + measurementAliasList, + tagsList, + attributesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createMultiTimeseries( + List<String> paths, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + List<Map<String, String>> propsList, + List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList, + List<String> measurementAliasList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createMultiTimeseries( + paths, + dataTypes, + encodings, + compressors, + propsList, + tagsList, + attributesList, + measurementAliasList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public boolean checkTimeseriesExists(String path) + throws IoTDBConnectionException, StatementExecutionException { + try { + return session.checkTimeseriesExists(path); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeQueryStatement(String sql) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeQueryStatement(sql); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeQueryStatement(String sql, long timeoutInMs) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeQueryStatement(sql, timeoutInMs); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void executeNonQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.executeNonQueryStatement(sql); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeRawDataQuery( + List<String> paths, long startTime, long endTime, long timeOut) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeRawDataQuery(paths, startTime, endTime, timeOut); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeRawDataQuery(paths, startTime, endTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeLastDataQuery(paths, lastTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime, long timeOut) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeLastDataQuery(paths, lastTime, timeOut); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeLastDataQuery(List<String> paths) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeLastDataQuery(paths); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeLastDataQueryForOneDevice( + String db, String device, List<String> sensors, boolean isLegalPathNodes) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeLastDataQueryForOneDevice(db, device, sensors, isLegalPathNodes); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeAggregationQuery( + List<String> paths, List<TAggregationType> aggregations) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeAggregationQuery(paths, aggregations); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeAggregationQuery( + List<String> paths, List<TAggregationType> aggregations, long startTime, long endTime) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeAggregationQuery(paths, aggregations, startTime, endTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeAggregationQuery( + List<String> paths, + List<TAggregationType> aggregations, + long startTime, + long endTime, + long interval) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeAggregationQuery(paths, aggregations, startTime, endTime, interval); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public SessionDataSet executeAggregationQuery( + List<String> paths, + List<TAggregationType> aggregations, + long startTime, + long endTime, + long interval, + long slidingStep) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.executeAggregationQuery( + paths, aggregations, startTime, endTime, interval, slidingStep); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecord( + String deviceId, + long time, + List<String> measurements, + List<TSDataType> types, + Object... values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecord(deviceId, time, measurements, types, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecord( + String deviceId, + long time, + List<String> measurements, + List<TSDataType> types, + List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecord(deviceId, time, measurements, types, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecord( + String deviceId, + long time, + List<String> measurements, + List<TSDataType> types, + List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecord(deviceId, time, measurements, types, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecord( + String deviceId, long time, List<String> measurements, List<String> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecord(deviceId, time, measurements, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public String getTimestampPrecision() throws TException { + try { + return session.getTimestampPrecision(); + } catch (TException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecord( + String deviceId, long time, List<String> measurements, List<String> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecord(deviceId, time, measurements, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecords(deviceIds, times, measurementsList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecords(deviceIds, times, measurementsList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList, + boolean haveSorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertRecordsOfOneDevice( + deviceId, times, measurementsList, typesList, valuesList, haveSorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertStringRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList, + boolean haveSorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertStringRecordsOfOneDevice( + deviceId, times, measurementsList, valuesList, haveSorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertStringRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecordsOfOneDevice( + deviceId, times, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList, + boolean haveSorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedRecordsOfOneDevice( + deviceId, times, measurementsList, typesList, valuesList, haveSorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedStringRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList, + boolean haveSorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedStringRecordsOfOneDevice( + deviceId, times, measurementsList, valuesList, haveSorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedStringRecordsOfOneDevice( + String deviceId, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedStringRecordsOfOneDevice(deviceId, times, measurementsList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertTablet(Tablet tablet) + throws StatementExecutionException, IoTDBConnectionException { + try { + session.insertTablet(tablet); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertTablet(tablet, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedTablet(Tablet tablet) + throws StatementExecutionException, IoTDBConnectionException { + try { + session.insertAlignedTablet(tablet); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedTablet(tablet, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertTablets(tablets); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertTablets(tablets, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedTablets(tablets); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.insertAlignedTablets(tablets, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertTablet(tablet); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertTablet(tablet, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertTablets(tablets); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertTablets(tablets, sorted); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertRecords(deviceIds, times, measurementsList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertRecords( + List<String> deviceIds, + List<Long> times, + List<List<String>> measurementsList, + List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertRecords(deviceIds, times, measurementsList, typesList, valuesList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertRecord( + String deviceId, long time, List<String> measurements, List<String> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertRecord(deviceId, time, measurements, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void testInsertRecord( + String deviceId, + long time, + List<String> measurements, + List<TSDataType> types, + List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.testInsertRecord(deviceId, time, measurements, types, values); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteTimeseries(String path) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteTimeseries(path); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteTimeseries(List<String> paths) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteTimeseries(paths); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteData(String path, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteData(path, endTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteData(List<String> paths, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteData(paths, endTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteData(List<String> paths, long startTime, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.deleteData(paths, startTime, endTime); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void setSchemaTemplate(String templateName, String prefixPath) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.setSchemaTemplate(templateName, prefixPath); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createSchemaTemplate(Template template) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.createSchemaTemplate(template); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createSchemaTemplate( + String templateName, + List<String> measurements, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors, + boolean isAligned) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.createSchemaTemplate( + templateName, measurements, dataTypes, encodings, compressors, isAligned); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createSchemaTemplate( + String name, + List<String> schemaNames, + List<List<String>> measurements, + List<List<TSDataType>> dataTypes, + List<List<TSEncoding>> encodings, + List<CompressionType> compressors) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createSchemaTemplate( + name, schemaNames, measurements, dataTypes, encodings, compressors); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void addAlignedMeasurementsInTemplate( + String templateName, + List<String> measurementsPath, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.addAlignedMeasurementsInTemplate( + templateName, measurementsPath, dataTypes, encodings, compressors); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void addAlignedMeasurementInTemplate( + String templateName, + String measurementPath, + TSDataType dataType, + TSEncoding encoding, + CompressionType compressor) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.addAlignedMeasurementInTemplate( + templateName, measurementPath, dataType, encoding, compressor); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void addUnalignedMeasurementsInTemplate( + String templateName, + List<String> measurementsPath, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, + List<CompressionType> compressors) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.addUnalignedMeasurementsInTemplate( + templateName, measurementsPath, dataTypes, encodings, compressors); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void addUnalignedMeasurementInTemplate( + String templateName, + String measurementPath, + TSDataType dataType, + TSEncoding encoding, + CompressionType compressor) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.addUnalignedMeasurementInTemplate( + templateName, measurementPath, dataType, encoding, compressor); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void deleteNodeInTemplate(String templateName, String path) + throws IOException, IoTDBConnectionException, StatementExecutionException { + try { + session.deleteNodeInTemplate(templateName, path); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public int countMeasurementsInTemplate(String name) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.countMeasurementsInTemplate(name); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public boolean isMeasurementInTemplate(String templateName, String path) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.isMeasurementInTemplate(templateName, path); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public boolean isPathExistInTemplate(String templateName, String path) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.isPathExistInTemplate(templateName, path); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public List<String> showMeasurementsInTemplate(String templateName) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.showMeasurementsInTemplate(templateName); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public List<String> showMeasurementsInTemplate(String templateName, String pattern) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.showMeasurementsInTemplate(templateName, pattern); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public List<String> showAllTemplates() + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.showAllTemplates(); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public List<String> showPathsTemplateSetOn(String templateName) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.showPathsTemplateSetOn(templateName); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public List<String> showPathsTemplateUsingOn(String templateName) + throws StatementExecutionException, IoTDBConnectionException { + try { + return session.showPathsTemplateUsingOn(templateName); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void unsetSchemaTemplate(String prefixPath, String templateName) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.unsetSchemaTemplate(prefixPath, templateName); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void dropSchemaTemplate(String templateName) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.dropSchemaTemplate(templateName); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void createTimeseriesUsingSchemaTemplate(List<String> devicePathList) + throws IoTDBConnectionException, StatementExecutionException { + try { + session.createTimeseriesUsingSchemaTemplate(devicePathList); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public boolean isEnableQueryRedirection() { + return session.isEnableQueryRedirection(); + } + + @Override + public void setEnableQueryRedirection(boolean enableQueryRedirection) { + session.setEnableQueryRedirection(enableQueryRedirection); + } + + @Override + public boolean isEnableRedirection() { + return session.isEnableRedirection(); + } + + @Override + public void setEnableRedirection(boolean enableRedirection) { + session.setEnableRedirection(enableRedirection); + } + + @Override + public void sortTablet(Tablet tablet) { + session.sortTablet(tablet); + } + + @Override + public TSBackupConfigurationResp getBackupConfiguration() + throws IoTDBConnectionException, StatementExecutionException { + try { + return session.getBackupConfiguration(); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException { + try { + return session.fetchAllConnections(); + } catch (IoTDBConnectionException e) { + sessionPool.cleanSessionAndMayThrowConnectionException(session); + closed.set(true); + session = null; + throw e; + } + } + + @Override + public void setQueryTimeout(long timeoutInMs) { + session.setQueryTimeout(timeoutInMs); + } + + @Override + public long getQueryTimeout() { + return session.getQueryTimeout(); + } +} diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java index d035400efc5..b70506ead24 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java @@ -75,7 +75,14 @@ public class SubscriptionSession extends Session { "Subscription session must be configured with an endpoint."); } return new SubscriptionSessionConnection( - session, endpoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect, database); + session, + endpoint, + zoneId, + availableNodes, + maxRetryCount, + retryIntervalInMs, + sqlDialect, + database); } /////////////////////////////// topic /////////////////////////////// diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java index 7cca80e0402..cc900abb7ba 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java @@ -55,7 +55,15 @@ public class SubscriptionSessionConnection extends SessionConnection { String sqlDialect, String database) throws IoTDBConnectionException { - super(session, endPoint, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, sqlDialect, database); + super( + session, + endPoint, + zoneId, + availableNodes, + maxRetryCount, + retryIntervalInMs, + sqlDialect, + database); } // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index fd4d23ba535..c5f485b680d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -92,6 +92,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use; import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; @@ -299,6 +300,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long startTime = System.nanoTime(); StatementType statementType = null; Throwable t = null; + boolean useDatabase = false; try { // create and cache dataset ExecutionResult result; @@ -339,6 +341,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s = relationSqlParser.createStatement(statement, clientSession.getZoneId()); + if (s instanceof Use) { + useDatabase = true; + } + if (s == null) { return RpcUtils.getTSExecuteStatementResp( RpcUtils.getStatus( @@ -381,6 +387,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } else { finished = true; resp = RpcUtils.getTSExecuteStatementResp(result.status); + // set for use XX + if (useDatabase) { + resp.setDatabase(clientSession.getDatabaseName()); + } } return resp; } @@ -1193,6 +1203,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(-1); } + Optional<String> database = parseDatabase(req); + IClientSession clientSession = SESSION_MANAGER.getCurrSession(); BasicOpenSessionResp openSessionResp = SESSION_MANAGER.login( SESSION_MANAGER.getCurrSession(), @@ -1203,6 +1215,10 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { clientVersion, sqlDialect); TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), openSessionResp.getMessage()); + + if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && database.isPresent()) { + clientSession.setDatabaseName(database.get()); + } TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, CURRENT_RPC_VERSION); return resp.setSessionId(openSessionResp.getSessionId()); } @@ -1231,6 +1247,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } + private Optional<String> parseDatabase(TSOpenSessionReq req) { + Map<String, String> configuration = req.configuration; + return configuration == null ? Optional.empty() : Optional.ofNullable(configuration.get("db")); + } + @Override public TSStatus closeSession(TSCloseSessionReq req) { return new TSStatus( diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 917a9a57900..9f5c5e7a8ce 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -70,6 +70,8 @@ struct TSExecuteStatementResp { 12: optional TSTracingInfo tracingInfo 13: optional list<binary> queryResult 14: optional bool moreData + // only be set while executing use XXX successfully + 15: optional string database } enum TSProtocolVersion {
