This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/useXX
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2fd763a3020a95106947b3c774ac09abada370d7
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Jul 8 20:27:49 2024 +0800

    Add SessionWrapper
---
 .../iotdb/itbase/runtime/ClusterTestStatement.java |    4 +-
 .../org/apache/iotdb/isession/IPooledSession.java  |   64 +
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |    5 +-
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |    4 +
 .../src/main/java/org/apache/iotdb/jdbc/Utils.java |   51 +-
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |    4 +
 .../java/org/apache/iotdb/session/Session.java     |   27 +-
 .../apache/iotdb/session/SessionConnection.java    |    8 +-
 .../org/apache/iotdb/session/pool/SessionPool.java |   17 +-
 .../apache/iotdb/session/pool/SessionWrapper.java  | 1504 ++++++++++++++++++++
 .../session/subscription/SubscriptionSession.java  |    9 +-
 .../SubscriptionSessionConnection.java             |   10 +-
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   21 +
 .../thrift-datanode/src/main/thrift/client.thrift  |    2 +
 14 files changed, 1700 insertions(+), 30 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
index f8c3e38ea57..d99ea24717e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/runtime/ClusterTestStatement.java
@@ -31,6 +31,8 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase;
+
 /** The implementation of {@link ClusterTestStatement} in cluster test. */
 public class ClusterTestStatement implements Statement {
 
@@ -190,7 +192,7 @@ public class ClusterTestStatement implements Statement {
     sql = sql.trim();
     boolean result = writeStatement.execute(sql);
     // if use XXXX, sendRequest to all statements
-    if (sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4))) {
+    if (isUseDatabase(sql)) {
       for (Statement readStatement : readStatements) {
         boolean tmp = readStatement.execute(sql);
         result = result && tmp;
diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
new file mode 100644
index 00000000000..04c25944a97
--- /dev/null
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/IPooledSession.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.isession;
+
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.write.record.Tablet;
+
+public interface IPooledSession extends AutoCloseable {
+
+  Version getVersion();
+
+  int getFetchSize();
+
+  void close() throws IoTDBConnectionException;
+
+  String getTimeZone();
+
+  SessionDataSet executeQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException;
+
+  SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
+      throws StatementExecutionException, IoTDBConnectionException;
+
+  void executeNonQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  String getTimestampPrecision() throws TException;
+
+  void insertTablet(Tablet tablet) throws StatementExecutionException, 
IoTDBConnectionException;
+
+  boolean isEnableQueryRedirection();
+
+  boolean isEnableRedirection();
+
+  TSBackupConfigurationResp getBackupConfiguration()
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  TSConnectionInfoResp fetchAllConnections() throws IoTDBConnectionException;
+
+  long getQueryTimeout();
+}
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index f62ed9799fc..721a1217ca2 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -62,7 +62,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executor;
-import javax.sql.DataSource;
 
 public class IoTDBConnection implements Connection {
 
@@ -595,4 +594,8 @@ public class IoTDBConnection implements Connection {
   public ServerProperties getServerProperties() throws TException {
     return getClient().getProperties();
   }
+
+  protected void changeDefaultDatabase(String database) {
+    params.setDb(database);
+  }
 }
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index eaa080aaefe..4dd1d505587 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -321,6 +321,10 @@ public class IoTDBStatement implements Statement {
       throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
     }
 
+    if (execResp.isSetDatabase()) {
+      connection.changeDefaultDatabase(execResp.getDatabase());
+    }
+
     if (execResp.isSetColumns()) {
       queryId = execResp.getQueryId();
       if (execResp.queryResult == null) {
diff --git a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
index 41283e24e42..7f90df75145 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/Utils.java
@@ -33,10 +33,10 @@ public class Utils {
     "squid:S5843",
     "squid:S5998"
   }) // Regular expressions should not be too complicated
-  static final Pattern SUFFIX_URL_PATTERN = 
Pattern.compile("([0-9]{1,5})(/|\\\\?.*=.*(&.*=.*)*)?");
+  static final Pattern SUFFIX_URL_PATTERN = 
Pattern.compile("(/|\\\\?.*=.*(&.*=.*)*)?");
 
   static final String COLON = ":";
-  static final String SLASH = "/";
+  static final char SLASH = '/';
   static final String PARAMETER_SEPARATOR = "?";
 
   static final String RPC_COMPRESS = "rpc_compress";
@@ -59,29 +59,42 @@ public class Utils {
       String subURL = url.substring(Config.IOTDB_URL_PREFIX.length());
       int i = subURL.lastIndexOf(COLON);
       host = subURL.substring(0, i);
+      params.setHost(host);
       suffixURL = subURL.substring(i + 1);
 
-      matcher = SUFFIX_URL_PATTERN.matcher(suffixURL);
-      if (matcher.matches() && parseUrlParam(subURL, info)) {
-        isUrlLegal = true;
+      // parse port
+      int port = 0;
+      for (; i < subURL.length() && Character.isDigit(subURL.charAt(i)); i++) {
+        port = port * 10 + (subURL.charAt(i) - '0');
+      }
+      // legal port
+      if (port >= 1 && port <= 65535) {
+        params.setPort(port);
+        // parse database
+        if (subURL.charAt(i) == SLASH) {
+          int endIndex = subURL.indexOf(PARAMETER_SEPARATOR, i + 1);
+          String database;
+          if (endIndex <= i + 1) {
+            database = subURL.substring(i + 1);
+            suffixURL = "";
+          } else {
+            database = subURL.substring(i + 1, endIndex);
+            suffixURL = subURL.substring(endIndex);
+          }
+          params.setDb(database);
+        }
+
+        matcher = SUFFIX_URL_PATTERN.matcher(suffixURL);
+        if (matcher.matches() && parseUrlParam(subURL, info)) {
+          isUrlLegal = true;
+        }
       }
     }
     if (!isUrlLegal) {
       throw new IoTDBURLException(
-          "Error url format, url should be jdbc:iotdb://anything:port/ or 
jdbc:iotdb://anything:port?property1=value1&property2=value2");
+          "Error url format, url should be 
jdbc:iotdb://anything:port/[database] or 
jdbc:iotdb://anything:port[/database]?property1=value1&property2=value2");
     }
 
-    params.setHost(host);
-
-    // parse port
-    String port = suffixURL;
-    if (suffixURL.contains(PARAMETER_SEPARATOR)) {
-      port = suffixURL.split("\\" + PARAMETER_SEPARATOR)[0];
-    } else if (suffixURL.contains(SLASH)) {
-      port = suffixURL.substring(0, suffixURL.length() - 1);
-    }
-    params.setPort(Integer.parseInt(port));
-
     if (info.containsKey(Config.AUTH_USER)) {
       params.setUsername(info.getProperty(Config.AUTH_USER));
     }
@@ -120,9 +133,6 @@ public class Utils {
     if (info.containsKey(Config.SQL_DIALECT)) {
       params.setSqlDialect(info.getProperty(Config.SQL_DIALECT));
     }
-    if (info.containsKey(Config.DATABASE)) {
-      params.setDb(info.getProperty(Config.DATABASE));
-    }
 
     return params;
   }
@@ -162,7 +172,6 @@ public class Utils {
         case Config.VERSION:
         case Config.NETWORK_TIMEOUT:
         case Config.SQL_DIALECT:
-        case Config.DATABASE:
           info.put(key, value);
           break;
         case Config.TIME_ZONE:
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index e48054abb19..06aac75aa66 100644
--- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -342,4 +342,8 @@ public class RpcUtils {
         : new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode())
             .setMessage(failedStatus.toString());
   }
+
+  public static boolean isUseDatabase(String sql) {
+    return sql.length() > 4 && "use ".equalsIgnoreCase(sql.substring(0, 4));
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 18a9142335c..5e0d205a3b5 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -89,6 +89,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -176,7 +177,8 @@ public class Session implements ISession {
 
   protected String sqlDialect = SessionConfig.SQL_DIALECT;
 
-  protected String database;
+  // may be null
+  protected volatile String database;
 
   private static final String REDIRECT_TWICE = "redirect twice";
 
@@ -606,7 +608,14 @@ public class Session implements ISession {
           session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs, 
sqlDialect, database);
     }
     return new SessionConnection(
-        session, endpoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs, sqlDialect, database);
+        session,
+        endpoint,
+        zoneId,
+        availableNodes,
+        maxRetryCount,
+        retryIntervalInMs,
+        sqlDialect,
+        database);
   }
 
   @Override
@@ -939,7 +948,13 @@ public class Session implements ISession {
   @Override
   public void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
+    String previousDB = database;
     defaultSessionConnection.executeNonQueryStatement(sql);
+    if (!Objects.equals(previousDB, database) && endPointToSessionConnection 
!= null) {
+      for (SessionConnection sessionConnection : 
endPointToSessionConnection.values()) {
+        sessionConnection.executeNonQueryStatement(sql);
+      }
+    }
   }
 
   /**
@@ -3829,6 +3844,14 @@ public class Session implements ISession {
     return defaultSessionConnection.fetchAllConnections();
   }
 
+  protected void changeDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
   public static class Builder {
     private String host = SessionConfig.DEFAULT_HOST;
     private int rpcPort = SessionConfig.DEFAULT_PORT;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index c6682ed8f2a..37f9f72f39d 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -206,7 +206,7 @@ public class SessionConnection {
     openReq.putToConfiguration("version", session.version.toString());
     openReq.putToConfiguration("sql_dialect", sqlDialect);
     if (database != null) {
-      openReq.putToConfiguration("database", database);
+      openReq.putToConfiguration("db", database);
     }
 
     try {
@@ -511,7 +511,11 @@ public class SessionConnection {
       throws TException {
     request.setSessionId(sessionId);
     request.setStatementId(statementId);
-    return client.executeUpdateStatementV2(request).status;
+    TSExecuteStatementResp resp = client.executeUpdateStatementV2(request);
+    if (resp.isSetDatabase()) {
+      session.changeDatabase(resp.getDatabase());
+    }
+    return resp.status;
   }
 
   protected SessionDataSet executeRawDataQuery(
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 8eeeb950f51..6979c314787 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -57,6 +57,8 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.iotdb.rpc.RpcUtils.isUseDatabase;
+
 /**
  * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do 
not need to consider
  * how to reuse a session connection. Even if the session is disconnected, the 
session pool can
@@ -155,6 +157,7 @@ public class SessionPool implements ISessionPool {
 
   protected String sqlDialect = SessionConfig.SQL_DIALECT;
 
+  // may be null
   protected String database;
 
   private static final String INSERT_RECORD_FAIL = "insertRecord failed";
@@ -720,7 +723,7 @@ public class SessionPool implements ISessionPool {
   }
 
   @SuppressWarnings({"squid:S2446"})
-  private void putBack(ISession session) {
+  protected void putBack(ISession session) {
     queue.push(session);
     synchronized (this) {
       // we do not need to notifyAll as any waited thread can continue to work 
after waked up.
@@ -834,6 +837,11 @@ public class SessionPool implements ISessionPool {
     }
   }
 
+  protected void cleanSessionAndMayThrowConnectionException(ISession session) {
+    closeSession(session);
+    tryConstructNewSession();
+  }
+
   /**
    * insert the data of a device. For each timestamp, the number of 
measurements is the same.
    *
@@ -3036,6 +3044,13 @@ public class SessionPool implements ISessionPool {
   @Override
   public void executeNonQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
+
+    // use XXX is forbidden in SessionPool.executeNonQueryStatement
+    if (isUseDatabase(sql)) {
+      throw new IllegalArgumentException(
+          String.format("SessionPool doesn't support executing %s directly", 
sql));
+    }
+
     ISession session = getSession();
     try {
       session.executeNonQueryStatement(sql);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
new file mode 100644
index 00000000000..1214859692a
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionWrapper.java
@@ -0,0 +1,1504 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.pool;
+
+import org.apache.iotdb.common.rpc.thrift.TAggregationType;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.isession.INodeSupplier;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.isession.template.Template;
+import org.apache.iotdb.isession.util.Version;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSBackupConfigurationResp;
+import org.apache.iotdb.service.rpc.thrift.TSConnectionInfoResp;
+import org.apache.iotdb.session.Session;
+
+import org.apache.thrift.TException;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * used for SessionPool.getSession need to do some other things like calling
+ * cleanSessionAndMayThrowConnectionException in SessionPool while 
encountering connection exception
+ * only need to putBack to SessionPool while closing
+ */
+public class SessionWrapper implements ISession {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SessionWrapper.class);
+
+  private Session session;
+
+  private final SessionPool sessionPool;
+
+  private final AtomicBoolean closed;
+
+  public SessionWrapper(Session session, SessionPool sessionPool) {
+    this.session = session;
+    this.sessionPool = sessionPool;
+    this.closed = new AtomicBoolean(false);
+  }
+
+  @Override
+  public Version getVersion() {
+    return session.getVersion();
+  }
+
+  @Override
+  public void setVersion(Version version) {
+    session.setVersion(version);
+  }
+
+  @Override
+  public int getFetchSize() {
+    return session.getFetchSize();
+  }
+
+  @Override
+  public void setFetchSize(int fetchSize) {
+    session.setFetchSize(fetchSize);
+  }
+
+  @Override
+  public void open() throws IoTDBConnectionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void open(boolean enableRPCCompression) throws 
IoTDBConnectionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void open(boolean enableRPCCompression, int connectionTimeoutInMs)
+      throws IoTDBConnectionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint,
+      INodeSupplier nodeSupplier)
+      throws IoTDBConnectionException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void close() throws IoTDBConnectionException {
+    if (closed.compareAndSet(false, true)) {
+      if (!Objects.equals(session.getDatabase(), sessionPool.database)
+          && sessionPool.database != null) {
+        try {
+          session.executeNonQueryStatement("use " + sessionPool.database);
+        } catch (StatementExecutionException e) {
+          LOGGER.warn(
+              "Failed to change back database by executing: use " + 
sessionPool.database, e);
+        }
+      }
+      sessionPool.putBack(session);
+      session = null;
+    }
+  }
+
+  @Override
+  public String getTimeZone() {
+    return session.getTimeZone();
+  }
+
+  @Override
+  public void setTimeZone(String zoneId)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      session.setTimeZone(zoneId);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void setTimeZoneOfSession(String zoneId) {
+    session.setTimeZoneOfSession(zoneId);
+  }
+
+  @Override
+  public void setStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.setStorageGroup(storageGroup);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteStorageGroup(storageGroup);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteStorageGroups(List<String> storageGroups)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteStorageGroups(storageGroups);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createDatabase(String database)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createDatabase(database);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteDatabase(String database)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteDatabase(database);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteDatabases(List<String> databases)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteDatabases(databases);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createTimeseries(
+      String path, TSDataType dataType, TSEncoding encoding, CompressionType 
compressor)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createTimeseries(path, dataType, encoding, compressor);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createTimeseries(
+      String path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      Map<String, String> tags,
+      Map<String, String> attributes,
+      String measurementAlias)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createTimeseries(
+          path, dataType, encoding, compressor, props, tags, attributes, 
measurementAlias);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createAlignedTimeseries(
+      String deviceId,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> measurementAliasList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createAlignedTimeseries(
+          deviceId, measurements, dataTypes, encodings, compressors, 
measurementAliasList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createAlignedTimeseries(
+      String deviceId,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<String> measurementAliasList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createAlignedTimeseries(
+          deviceId,
+          measurements,
+          dataTypes,
+          encodings,
+          compressors,
+          measurementAliasList,
+          tagsList,
+          attributesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createMultiTimeseries(
+      List<String> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      List<Map<String, String>> propsList,
+      List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList,
+      List<String> measurementAliasList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createMultiTimeseries(
+          paths,
+          dataTypes,
+          encodings,
+          compressors,
+          propsList,
+          tagsList,
+          attributesList,
+          measurementAliasList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean checkTimeseriesExists(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      return session.checkTimeseriesExists(path);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeQueryStatement(sql);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeQueryStatement(String sql, long timeoutInMs)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeQueryStatement(sql, timeoutInMs);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void executeNonQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.executeNonQueryStatement(sql);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeRawDataQuery(
+      List<String> paths, long startTime, long endTime, long timeOut)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeRawDataQuery(paths, startTime, endTime, timeOut);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeRawDataQuery(List<String> paths, long 
startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeRawDataQuery(paths, startTime, endTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeLastDataQuery(List<String> paths, long lastTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeLastDataQuery(paths, lastTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeLastDataQuery(List<String> paths, long 
lastTime, long timeOut)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeLastDataQuery(paths, lastTime, timeOut);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeLastDataQuery(List<String> paths)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeLastDataQuery(paths);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeLastDataQueryForOneDevice(
+      String db, String device, List<String> sensors, boolean isLegalPathNodes)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeLastDataQueryForOneDevice(db, device, sensors, 
isLegalPathNodes);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<TAggregationType> aggregations)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeAggregationQuery(paths, aggregations);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths, List<TAggregationType> aggregations, long startTime, 
long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeAggregationQuery(paths, aggregations, startTime, 
endTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<TAggregationType> aggregations,
+      long startTime,
+      long endTime,
+      long interval)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeAggregationQuery(paths, aggregations, startTime, 
endTime, interval);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public SessionDataSet executeAggregationQuery(
+      List<String> paths,
+      List<TAggregationType> aggregations,
+      long startTime,
+      long endTime,
+      long interval,
+      long slidingStep)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.executeAggregationQuery(
+          paths, aggregations, startTime, endTime, interval, slidingStep);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      Object... values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecord(deviceId, time, measurements, types, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecord(deviceId, time, measurements, types, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecord(deviceId, time, measurements, types, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecord(
+      String deviceId, long time, List<String> measurements, List<String> 
values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecord(deviceId, time, measurements, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public String getTimestampPrecision() throws TException {
+    try {
+      return session.getTimestampPrecision();
+    } catch (TException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecord(
+      String deviceId, long time, List<String> measurements, List<String> 
values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecord(deviceId, time, measurements, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecords(deviceIds, times, measurementsList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecords(deviceIds, times, measurementsList, 
valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecords(deviceIds, times, measurementsList, 
typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecordsOfOneDevice(deviceId, times, measurementsList, 
typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList,
+      boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertStringRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList,
+      boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertStringRecordsOfOneDevice(
+          deviceId, times, measurementsList, valuesList, haveSorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertStringRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertStringRecordsOfOneDevice(deviceId, times, 
measurementsList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList,
+      boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedStringRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList,
+      boolean haveSorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedStringRecordsOfOneDevice(
+          deviceId, times, measurementsList, valuesList, haveSorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedStringRecordsOfOneDevice(
+      String deviceId,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedStringRecordsOfOneDevice(deviceId, times, 
measurementsList, valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      session.insertTablet(tablet);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertTablet(tablet, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      session.insertAlignedTablet(tablet);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedTablet(tablet, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertTablets(tablets);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertTablets(tablets, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedTablets(tablets);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.insertAlignedTablets(tablets, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertTablet(tablet);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertTablet(tablet, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertTablets(tablets);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertTablets(tablets, sorted);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertRecords(deviceIds, times, measurementsList, 
valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertRecords(
+      List<String> deviceIds,
+      List<Long> times,
+      List<List<String>> measurementsList,
+      List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertRecord(
+      String deviceId, long time, List<String> measurements, List<String> 
values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertRecord(deviceId, time, measurements, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void testInsertRecord(
+      String deviceId,
+      long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.testInsertRecord(deviceId, time, measurements, types, values);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteTimeseries(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteTimeseries(path);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteTimeseries(List<String> paths)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteTimeseries(paths);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteData(String path, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteData(path, endTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteData(List<String> paths, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteData(paths, endTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteData(List<String> paths, long startTime, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.deleteData(paths, startTime, endTime);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void setSchemaTemplate(String templateName, String prefixPath)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.setSchemaTemplate(templateName, prefixPath);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createSchemaTemplate(Template template)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.createSchemaTemplate(template);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createSchemaTemplate(
+      String templateName,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors,
+      boolean isAligned)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.createSchemaTemplate(
+          templateName, measurements, dataTypes, encodings, compressors, 
isAligned);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createSchemaTemplate(
+      String name,
+      List<String> schemaNames,
+      List<List<String>> measurements,
+      List<List<TSDataType>> dataTypes,
+      List<List<TSEncoding>> encodings,
+      List<CompressionType> compressors)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createSchemaTemplate(
+          name, schemaNames, measurements, dataTypes, encodings, compressors);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void addAlignedMeasurementsInTemplate(
+      String templateName,
+      List<String> measurementsPath,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.addAlignedMeasurementsInTemplate(
+          templateName, measurementsPath, dataTypes, encodings, compressors);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void addAlignedMeasurementInTemplate(
+      String templateName,
+      String measurementPath,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.addAlignedMeasurementInTemplate(
+          templateName, measurementPath, dataType, encoding, compressor);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void addUnalignedMeasurementsInTemplate(
+      String templateName,
+      List<String> measurementsPath,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.addUnalignedMeasurementsInTemplate(
+          templateName, measurementsPath, dataTypes, encodings, compressors);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void addUnalignedMeasurementInTemplate(
+      String templateName,
+      String measurementPath,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.addUnalignedMeasurementInTemplate(
+          templateName, measurementPath, dataType, encoding, compressor);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void deleteNodeInTemplate(String templateName, String path)
+      throws IOException, IoTDBConnectionException, 
StatementExecutionException {
+    try {
+      session.deleteNodeInTemplate(templateName, path);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public int countMeasurementsInTemplate(String name)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.countMeasurementsInTemplate(name);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean isMeasurementInTemplate(String templateName, String path)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.isMeasurementInTemplate(templateName, path);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean isPathExistInTemplate(String templateName, String path)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.isPathExistInTemplate(templateName, path);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> showMeasurementsInTemplate(String templateName)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.showMeasurementsInTemplate(templateName);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> showMeasurementsInTemplate(String templateName, String 
pattern)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.showMeasurementsInTemplate(templateName, pattern);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> showAllTemplates()
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.showAllTemplates();
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> showPathsTemplateSetOn(String templateName)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.showPathsTemplateSetOn(templateName);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public List<String> showPathsTemplateUsingOn(String templateName)
+      throws StatementExecutionException, IoTDBConnectionException {
+    try {
+      return session.showPathsTemplateUsingOn(templateName);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void unsetSchemaTemplate(String prefixPath, String templateName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.unsetSchemaTemplate(prefixPath, templateName);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void dropSchemaTemplate(String templateName)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.dropSchemaTemplate(templateName);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void createTimeseriesUsingSchemaTemplate(List<String> devicePathList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      session.createTimeseriesUsingSchemaTemplate(devicePathList);
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public boolean isEnableQueryRedirection() {
+    return session.isEnableQueryRedirection();
+  }
+
+  @Override
+  public void setEnableQueryRedirection(boolean enableQueryRedirection) {
+    session.setEnableQueryRedirection(enableQueryRedirection);
+  }
+
+  @Override
+  public boolean isEnableRedirection() {
+    return session.isEnableRedirection();
+  }
+
+  @Override
+  public void setEnableRedirection(boolean enableRedirection) {
+    session.setEnableRedirection(enableRedirection);
+  }
+
+  @Override
+  public void sortTablet(Tablet tablet) {
+    session.sortTablet(tablet);
+  }
+
+  @Override
+  public TSBackupConfigurationResp getBackupConfiguration()
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      return session.getBackupConfiguration();
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public TSConnectionInfoResp fetchAllConnections() throws 
IoTDBConnectionException {
+    try {
+      return session.fetchAllConnections();
+    } catch (IoTDBConnectionException e) {
+      sessionPool.cleanSessionAndMayThrowConnectionException(session);
+      closed.set(true);
+      session = null;
+      throw e;
+    }
+  }
+
+  @Override
+  public void setQueryTimeout(long timeoutInMs) {
+    session.setQueryTimeout(timeoutInMs);
+  }
+
+  @Override
+  public long getQueryTimeout() {
+    return session.getQueryTimeout();
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index d035400efc5..b70506ead24 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -75,7 +75,14 @@ public class SubscriptionSession extends Session {
           "Subscription session must be configured with an endpoint.");
     }
     return new SubscriptionSessionConnection(
-        session, endpoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs, sqlDialect, database);
+        session,
+        endpoint,
+        zoneId,
+        availableNodes,
+        maxRetryCount,
+        retryIntervalInMs,
+        sqlDialect,
+        database);
   }
 
   /////////////////////////////// topic ///////////////////////////////
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 7cca80e0402..cc900abb7ba 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -55,7 +55,15 @@ public class SubscriptionSessionConnection extends 
SessionConnection {
       String sqlDialect,
       String database)
       throws IoTDBConnectionException {
-    super(session, endPoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs, sqlDialect, database);
+    super(
+        session,
+        endPoint,
+        zoneId,
+        availableNodes,
+        maxRetryCount,
+        retryIntervalInMs,
+        sqlDialect,
+        database);
   }
 
   // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index fd4d23ba535..c5f485b680d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -92,6 +92,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.GroupByTimePa
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Use;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -299,6 +300,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     long startTime = System.nanoTime();
     StatementType statementType = null;
     Throwable t = null;
+    boolean useDatabase = false;
     try {
       // create and cache dataset
       ExecutionResult result;
@@ -339,6 +341,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s =
             relationSqlParser.createStatement(statement, 
clientSession.getZoneId());
 
+        if (s instanceof Use) {
+          useDatabase = true;
+        }
+
         if (s == null) {
           return RpcUtils.getTSExecuteStatementResp(
               RpcUtils.getStatus(
@@ -381,6 +387,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         } else {
           finished = true;
           resp = RpcUtils.getTSExecuteStatementResp(result.status);
+          // set for use XX
+          if (useDatabase) {
+            resp.setDatabase(clientSession.getDatabaseName());
+          }
         }
         return resp;
       }
@@ -1193,6 +1203,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, 
CURRENT_RPC_VERSION);
       return resp.setSessionId(-1);
     }
+    Optional<String> database = parseDatabase(req);
+    IClientSession clientSession = SESSION_MANAGER.getCurrSession();
     BasicOpenSessionResp openSessionResp =
         SESSION_MANAGER.login(
             SESSION_MANAGER.getCurrSession(),
@@ -1203,6 +1215,10 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             clientVersion,
             sqlDialect);
     TSStatus tsStatus = RpcUtils.getStatus(openSessionResp.getCode(), 
openSessionResp.getMessage());
+
+    if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() && 
database.isPresent()) {
+      clientSession.setDatabaseName(database.get());
+    }
     TSOpenSessionResp resp = new TSOpenSessionResp(tsStatus, 
CURRENT_RPC_VERSION);
     return resp.setSessionId(openSessionResp.getSessionId());
   }
@@ -1231,6 +1247,11 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     }
   }
 
+  private Optional<String> parseDatabase(TSOpenSessionReq req) {
+    Map<String, String> configuration = req.configuration;
+    return configuration == null ? Optional.empty() : 
Optional.ofNullable(configuration.get("db"));
+  }
+
   @Override
   public TSStatus closeSession(TSCloseSessionReq req) {
     return new TSStatus(
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 917a9a57900..9f5c5e7a8ce 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -70,6 +70,8 @@ struct TSExecuteStatementResp {
   12: optional TSTracingInfo tracingInfo
   13: optional list<binary> queryResult
   14: optional bool moreData
+  // only be set while executing use XXX successfully
+  15: optional string database
 }
 
 enum TSProtocolVersion {

Reply via email to