This is an automated email from the ASF dual-hosted git repository.

xuekaifeng pushed a commit to branch xkf_tpc_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 241e43f78dc0838f7620bfb8e78d53190a267def
Author: 151250176 <[email protected]>
AuthorDate: Mon Jan 4 12:59:04 2021 +0800

    for tpc
---
 .../apache/iotdb/db/service/AsyncInsertPool.java   |   51 +
 .../org/apache/iotdb/db/service/TSServiceImpl.java |    6 +
 .../org/apache/iotdb/db/serviceSession/Config.java |   31 +
 .../apache/iotdb/db/serviceSession/Session.java    | 1116 ++++++++++++++++++++
 .../iotdb/db/serviceSession/SessionConnection.java |  624 +++++++++++
 .../iotdb/db/serviceSession/SessionDataSet.java    |  234 ++++
 .../iotdb/db/serviceSession/SessionUtils.java      |   90 ++
 .../serviceSession/pool/SessionDataSetWrapper.java |  101 ++
 .../iotdb/db/serviceSession/pool/SessionPool.java  |  983 +++++++++++++++++
 .../iotdb/db/integration/IoTDBMultiDeviceIT.java   |  322 ++++++
 thrift/src/main/thrift/rpc.thrift                  |    1 +
 11 files changed, 3559 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java 
b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java
new file mode 100644
index 0000000..23c7bf7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java
@@ -0,0 +1,51 @@
+package org.apache.iotdb.db.service;
+
+import java.util.concurrent.ExecutorService;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.serviceSession.pool.SessionPool;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AsyncInsertPool {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(AsyncInsertPool.class);
+  ExecutorService pool;
+  SessionPool sessionPool;
+
+
+  private AsyncInsertPool(){
+    sessionPool = new SessionPool("192.168.130.6", 6667, "root", "root", 20);
+    pool = IoTDBThreadPoolFactory
+        .newFixedThreadPool(10, "async insert pool");
+  }
+
+
+  public void submit(TSInsertTabletsReq req){
+    pool.submit(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          sessionPool.insertTablets(req);
+        } catch (IoTDBConnectionException | StatementExecutionException e) {
+          logger.error("transfer request failed", e);
+        }
+      }
+    });
+  }
+
+  public static AsyncInsertPool getInstance() {
+    return AsyncInsertPool.InstanceHolder.INSTANCE;
+  }
+
+  static class InstanceHolder {
+
+    private InstanceHolder() {
+      // forbidding instantiation
+    }
+
+    private static final AsyncInsertPool INSTANCE = new AsyncInsertPool();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index e423b14..0d95ee2 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -1497,6 +1497,12 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
 
   @Override
   public TSStatus insertTablets(TSInsertTabletsReq req) {
+    // transfer to another
+    if(!req.isFinal){
+      req.isFinal = true;
+      AsyncInsertPool.getInstance().submit(req);
+    }
+    //
     long t1 = System.currentTimeMillis();
     try {
       if (!checkLogin(req.getSessionId())) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java
new file mode 100644
index 0000000..71339a8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession;
+
+public class Config {
+
+  public static final String DEFAULT_USER = "root";
+  public static final String DEFAULT_PASSWORD = "root";
+  public static final int DEFAULT_FETCH_SIZE = 10000;
+  public static final int DEFAULT_TIMEOUT_MS = 0;
+  public static final boolean DEFAULT_CACHE_LEADER_MODE = true;
+
+  public static final int RETRY_NUM = 3;
+  public static final long RETRY_INTERVAL_MS = 1000;
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java
new file mode 100644
index 0000000..9dbe2f0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java
@@ -0,0 +1,1116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession;
+
+import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.iotdb.rpc.BatchExecutionException;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, 
ignore todos
+public class Session {
+
+  private static final Logger logger = LoggerFactory.getLogger(Session.class);
+  protected static final TSProtocolVersion protocolVersion = 
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
+  public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data 
type:";
+  protected String username;
+  protected String password;
+  protected int fetchSize;
+  protected boolean enableRPCCompression;
+  protected int connectionTimeoutInMs;
+
+  private EndPoint defaultEndPoint;
+  private SessionConnection defaultSessionConnection;
+  protected boolean isClosed = true;
+  private ZoneId zoneId;
+
+  // Cluster version cache
+  private SessionConnection metaSessionConnection;
+  private Map<String, EndPoint> deviceIdToEndpoint;
+  private Map<EndPoint, SessionConnection> endPointToSessionConnection;
+  private AtomicReference<IoTDBConnectionException> tmp = new 
AtomicReference<>();
+
+  public Session(String host, int rpcPort) {
+    this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, 
Config.DEFAULT_FETCH_SIZE,
+        null);
+  }
+
+  public Session(String host, String rpcPort, String username, String 
password) {
+    this(host, Integer.parseInt(rpcPort), username, password, 
Config.DEFAULT_FETCH_SIZE, null);
+  }
+
+  public Session(String host, int rpcPort, String username, String password) {
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null);
+  }
+
+  public Session(String host, int rpcPort, String username, String password, 
int fetchSize) {
+    this(host, rpcPort, username, password, fetchSize, null);
+  }
+
+  public Session(String host, int rpcPort, String username, String password, 
ZoneId zoneId) {
+    this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId);
+  }
+
+  public Session(String host, int rpcPort, String username, String password, 
int fetchSize,
+      ZoneId zoneId) {
+    this.defaultEndPoint = new EndPoint(host, rpcPort);
+    this.username = username;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.zoneId = zoneId;
+  }
+
+  public void setFetchSize(int fetchSize){
+    this.fetchSize = fetchSize;
+  }
+
+  public int getFetchSize(){ return this.fetchSize; }
+
+  public synchronized void open() throws IoTDBConnectionException {
+    open(false, Config.DEFAULT_TIMEOUT_MS);
+  }
+
+  public synchronized void open(boolean enableRPCCompression) throws 
IoTDBConnectionException {
+    open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS);
+  }
+
+  private synchronized void open(boolean enableRPCCompression, int 
connectionTimeoutInMs)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = new SessionConnection(this, defaultEndPoint, 
zoneId);
+    metaSessionConnection = defaultSessionConnection;
+    isClosed = false;
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      deviceIdToEndpoint = new HashMap<>();
+      endPointToSessionConnection = new HashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
+    }
+  }
+
+  public synchronized void close() throws IoTDBConnectionException {
+    if (isClosed) {
+      return;
+    }
+    try {
+      if (Config.DEFAULT_CACHE_LEADER_MODE) {
+        for (SessionConnection sessionConnection : 
endPointToSessionConnection.values()) {
+          sessionConnection.close();
+        }
+      } else {
+        defaultSessionConnection.close();
+      }
+    } finally {
+      isClosed = true;
+    }
+  }
+
+  public synchronized String getTimeZone() {
+    return defaultSessionConnection.getTimeZone();
+  }
+
+  public synchronized void setTimeZone(String zoneId)
+      throws StatementExecutionException, IoTDBConnectionException {
+    defaultSessionConnection.setTimeZone(zoneId);
+  }
+
+  public void setStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      metaSessionConnection.setStorageGroup(storageGroup);
+    } catch (RedirectException e) {
+      handleMetaRedirection(storageGroup, e);
+    }
+  }
+
+  public void deleteStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      
metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
+    } catch (RedirectException e) {
+      handleMetaRedirection(storageGroup, e);
+    }
+  }
+
+  public void deleteStorageGroups(List<String> storageGroups)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      metaSessionConnection.deleteStorageGroups(storageGroups);
+    } catch (RedirectException e) {
+      handleMetaRedirection(storageGroups.toString(), e);
+    }
+  }
+
+  public void createTimeseries(String path, TSDataType dataType,
+      TSEncoding encoding, CompressionType compressor)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, 
encoding, compressor,
+        null, null, null, null);
+    defaultSessionConnection.createTimeseries(request);
+  }
+
+  public void createTimeseries(String path, TSDataType dataType,
+      TSEncoding encoding, CompressionType compressor, Map<String, String> 
props,
+      Map<String, String> tags, Map<String, String> attributes, String 
measurementAlias)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, 
encoding, compressor,
+        props, tags, attributes, measurementAlias);
+    defaultSessionConnection.createTimeseries(request);
+  }
+
+  private TSCreateTimeseriesReq genTSCreateTimeseriesReq(String path, 
TSDataType dataType,
+      TSEncoding encoding, CompressionType compressor, Map<String, String> 
props,
+      Map<String, String> tags, Map<String, String> attributes, String 
measurementAlias) {
+    TSCreateTimeseriesReq request = new TSCreateTimeseriesReq();
+    request.setPath(path);
+    request.setDataType(dataType.ordinal());
+    request.setEncoding(encoding.ordinal());
+    request.setCompressor(compressor.ordinal());
+    request.setProps(props);
+    request.setTags(tags);
+    request.setAttributes(attributes);
+    request.setMeasurementAlias(measurementAlias);
+    return request;
+  }
+
+  public void createMultiTimeseries(List<String> paths, List<TSDataType> 
dataTypes,
+      List<TSEncoding> encodings, List<CompressionType> compressors,
+      List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList, List<String> 
measurementAliasList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSCreateMultiTimeseriesReq request = genTSCreateMultiTimeseriesReq(paths, 
dataTypes, encodings,
+        compressors, propsList, tagsList, attributesList, 
measurementAliasList);
+    defaultSessionConnection.createMultiTimeseries(request);
+  }
+
+  private TSCreateMultiTimeseriesReq 
genTSCreateMultiTimeseriesReq(List<String> paths,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings, List<CompressionType> compressors,
+      List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList, List<String> 
measurementAliasList) {
+    TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq();
+
+    request.setPaths(paths);
+
+    List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size());
+    for (TSDataType dataType : dataTypes) {
+      dataTypeOrdinals.add(dataType.ordinal());
+    }
+    request.setDataTypes(dataTypeOrdinals);
+
+    List<Integer> encodingOrdinals = new ArrayList<>(paths.size());
+    for (TSEncoding encoding : encodings) {
+      encodingOrdinals.add(encoding.ordinal());
+    }
+    request.setEncodings(encodingOrdinals);
+
+    List<Integer> compressionOrdinals = new ArrayList<>(paths.size());
+    for (CompressionType compression : compressors) {
+      compressionOrdinals.add(compression.ordinal());
+    }
+    request.setCompressors(compressionOrdinals);
+
+    request.setPropsList(propsList);
+    request.setTagsList(tagsList);
+    request.setAttributesList(attributesList);
+    request.setMeasurementAliasList(measurementAliasList);
+
+    return request;
+  }
+
+  public boolean checkTimeseriesExists(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    return defaultSessionConnection.checkTimeseriesExists(path);
+  }
+
+  /**
+   * execure query sql
+   *
+   * @param sql query statement
+   * @return result set
+   */
+  public SessionDataSet executeQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    return defaultSessionConnection.executeQueryStatement(sql);
+  }
+
+  /**
+   * execute non query statement
+   *
+   * @param sql non query statement
+   */
+  public void executeNonQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    defaultSessionConnection.executeNonQueryStatement(sql);
+  }
+
+  /**
+   * query eg. select * from paths where time >= startTime and time < endTime 
time interval include
+   * startTime and exclude endTime
+   *
+   * @param paths
+   * @param startTime included
+   * @param endTime   excluded
+   * @return
+   * @throws StatementExecutionException
+   * @throws IoTDBConnectionException
+   */
+
+  public SessionDataSet executeRawDataQuery(List<String> paths, long 
startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    return defaultSessionConnection.executeRawDataQuery(paths, startTime, 
endTime);
+  }
+
+
+  /**
+   * insert data in one row, if you want to improve your performance, please 
use insertRecords
+   * method or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types,
+      Object... values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, 
measurements, types,
+        Arrays.asList(values));
+    insertRecord(deviceId, request);
+  }
+
+  private void insertRecord(String deviceId, TSInsertRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      getSessionConnection(deviceId).insertRecord(request);
+    } catch (RedirectException e) {
+      handleRedirection(deviceId, e.getEndPoint());
+    }
+  }
+
+  private void insertRecord(String deviceId, TSInsertStringRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      getSessionConnection(deviceId).insertRecord(request);
+    } catch (RedirectException e) {
+      handleRedirection(deviceId, e.getEndPoint());
+    }
+  }
+
+  private SessionConnection getSessionConnection(String deviceId) {
+    EndPoint endPoint;
+    if (Config.DEFAULT_CACHE_LEADER_MODE
+        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+      return endPointToSessionConnection.get(endPoint);
+    } else {
+      return defaultSessionConnection;
+    }
+  }
+
+  private void handleMetaRedirection(String storageGroup, RedirectException e)
+      throws IoTDBConnectionException {
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage());
+      SessionConnection connection = endPointToSessionConnection
+          .computeIfAbsent(e.getEndPoint(), k -> {
+            try {
+              return new SessionConnection(this, e.getEndPoint(), zoneId);
+            } catch (IoTDBConnectionException ex) {
+              tmp.set(ex);
+              return null;
+            }
+          });
+      if (connection == null) {
+        throw new IoTDBConnectionException(tmp.get());
+      }
+      metaSessionConnection = connection;
+    }
+  }
+
+  private void handleRedirection(String deviceId, EndPoint endpoint)
+      throws IoTDBConnectionException {
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      deviceIdToEndpoint.put(deviceId, endpoint);
+      SessionConnection connection = endPointToSessionConnection
+          .computeIfAbsent(endpoint, k -> {
+            try {
+              return new SessionConnection(this, endpoint, zoneId);
+            } catch (IoTDBConnectionException ex) {
+              tmp.set(ex);
+              return null;
+            }
+          });
+      if (connection == null) {
+        throw new IoTDBConnectionException(tmp.get());
+      }
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use 
insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types,
+      List<Object> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, 
measurements, types, values);
+    insertRecord(deviceId, request);
+  }
+
+  private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time,
+      List<String> measurements,
+      List<TSDataType> types,
+      List<Object> values) throws IoTDBConnectionException {
+    TSInsertRecordReq request = new TSInsertRecordReq();
+    request.setDeviceId(deviceId);
+    request.setTimestamp(time);
+    request.setMeasurements(measurements);
+    ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
+    putValues(types, values, buffer);
+    request.setValues(buffer);
+    return request;
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use 
insertInBatch method
+   * or insertBatch method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, 
time, measurements,
+        values);
+    insertRecord(deviceId, request);
+  }
+
+  private TSInsertStringRecordReq genTSInsertStringRecordReq(String deviceId, 
long time,
+      List<String> measurements, List<String> values) {
+    TSInsertStringRecordReq request = new TSInsertStringRecordReq();
+    request.setDeviceId(deviceId);
+    request.setTimestamp(time);
+    request.setMeasurements(measurements);
+    request.setValues(values);
+    return request;
+  }
+
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to 
server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, 
number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    int len = deviceIds.size();
+    if (len != times.size() || len != measurementsList.size() || len != 
valuesList.size()) {
+      throw new IllegalArgumentException(
+          "deviceIds, times, measurementsList and valuesList's size should be 
equal");
+    }
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, 
valuesList);
+    } else {
+      TSInsertStringRecordsReq request = 
genTSInsertStringRecordsReq(deviceIds, times,
+          measurementsList, valuesList);
+      try {
+        defaultSessionConnection.insertRecords(request);
+      } catch (RedirectException ignored) {
+        // ignore
+      }
+    }
+  }
+
+  private void insertStringRecordsWithLeaderCache(List<String> deviceIds, 
List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>();
+    for (int i = 0; i < deviceIds.size(); i++) {
+      TSInsertStringRecordsReq request = deviceGroup
+          .computeIfAbsent(deviceIds.get(i), k -> new 
TSInsertStringRecordsReq());
+      updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i),
+          measurementsList.get(i), valuesList.get(i));
+    }
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<String, TSInsertStringRecordsReq> entry : 
deviceGroup.entrySet()) {
+      try {
+        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+      } catch (RedirectException e) {
+        handleRedirection(entry.getKey(), e.getEndPoint());
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
+  private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> 
deviceId,
+      List<Long> time,
+      List<List<String>> measurements, List<List<String>> values) {
+    TSInsertStringRecordsReq request = new TSInsertStringRecordsReq();
+    request.setDeviceIds(deviceId);
+    request.setTimestamps(time);
+    request.setMeasurementsList(measurements);
+    request.setValuesList(values);
+    return request;
+  }
+
+  private void updateTSInsertStringRecordsReq(TSInsertStringRecordsReq request,
+      String deviceId, long time,
+      List<String> measurements, List<String> values) {
+    request.addToDeviceIds(deviceId);
+    request.addToTimestamps(time);
+    request.addToMeasurementsList(measurements);
+    request.addToValuesList(values);
+  }
+
+  /**
+   * Insert multiple rows, which can reduce the overhead of network. This 
method is just like jdbc
+   * executeBatch, we pack some insert request in batch and send them to 
server. If you want improve
+   * your performance, please see insertTablet method
+   * <p>
+   * Each row is independent, which could have different deviceId, time, 
number of measurements
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    int len = deviceIds.size();
+    if (len != times.size() || len != measurementsList.size() || len != 
valuesList.size()) {
+      throw new IllegalArgumentException(
+          "deviceIds, times, measurementsList and valuesList's size should be 
equal");
+    }
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      insertRecordsWithLeaderCache(deviceIds, times, measurementsList, 
typesList, valuesList);
+    } else {
+      TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, 
measurementsList,
+          typesList, valuesList);
+      try {
+        defaultSessionConnection
+            .insertRecords(request);
+      } catch (RedirectException ignored) {
+        // ignore
+      }
+    }
+  }
+
+  private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> 
times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>();
+    for (int i = 0; i < deviceIds.size(); i++) {
+      TSInsertRecordsReq request = deviceGroup
+          .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq());
+      updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i),
+          measurementsList.get(i), typesList.get(i), valuesList.get(i));
+    }
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) {
+      try {
+        getSessionConnection(entry.getKey()).insertRecords(entry.getValue());
+      } catch (RedirectException e) {
+        handleRedirection(entry.getKey(), e.getEndPoint());
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
+  private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, 
List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException {
+    TSInsertRecordsReq request = new TSInsertRecordsReq();
+    request.setDeviceIds(deviceIds);
+    request.setTimestamps(times);
+    request.setMeasurementsList(measurementsList);
+    List<ByteBuffer> buffersList = new ArrayList<>();
+    for (int i = 0; i < measurementsList.size(); i++) {
+      ByteBuffer buffer = 
ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
+      putValues(typesList.get(i), valuesList.get(i), buffer);
+      buffersList.add(buffer);
+    }
+    request.setValuesList(buffersList);
+    return request;
+  }
+
+  private void updateTSInsertRecordsReq(TSInsertRecordsReq request, String 
deviceId, Long time,
+      List<String> measurements, List<TSDataType> types,
+      List<Object> values) throws IoTDBConnectionException {
+    request.addToDeviceIds(deviceId);
+    request.addToTimestamps(time);
+    request.addToMeasurementsList(measurements);
+    ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values));
+    putValues(types, values, buffer);
+    request.addToValuesList(buffer);
+  }
+
+  /**
+   * insert the data of a device. For each timestamp, the number of 
measurements is the same.
+   * <p>
+   * a Tablet example: device1 time s1, s2, s3 1,   1,  1,  1 2,   2,  2,  2 
3,   3,  3,  3
+   * <p/>
+   * times in Tablet may be not in ascending order
+   *
+   * @param tablet data batch
+   */
+  public void insertTablet(Tablet tablet)
+      throws StatementExecutionException, IoTDBConnectionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, false);
+    EndPoint endPoint;
+    try {
+      if (Config.DEFAULT_CACHE_LEADER_MODE
+          && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
+        endPointToSessionConnection.get(endPoint).insertTablet(request);
+      } else {
+        defaultSessionConnection.insertTablet(request);
+      }
+    } catch (RedirectException e) {
+      handleRedirection(tablet.deviceId, e.getEndPoint());
+    }
+  }
+
+  /**
+   * insert a Tablet
+   *
+   * @param tablet data batch
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
+    EndPoint endPoint;
+    try {
+      if (Config.DEFAULT_CACHE_LEADER_MODE
+          && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) {
+        endPointToSessionConnection.get(endPoint).insertTablet(request);
+      } else {
+        defaultSessionConnection.insertTablet(request);
+      }
+    } catch (RedirectException e) {
+      handleRedirection(tablet.deviceId, e.getEndPoint());
+    }
+  }
+
+  private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted)
+      throws BatchExecutionException {
+    if (sorted) {
+      checkSortedThrowable(tablet);
+    } else {
+      sortTablet(tablet);
+    }
+
+    TSInsertTabletReq request = new TSInsertTabletReq();
+    request.setDeviceId(tablet.deviceId);
+    for (MeasurementSchema measurementSchema : tablet.getSchemas()) {
+      request.addToMeasurements(measurementSchema.getMeasurementId());
+      request.addToTypes(measurementSchema.getType().ordinal());
+    }
+    request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+    request.setValues(SessionUtils.getValueBuffer(tablet));
+    request.setSize(tablet.rowSize);
+    return request;
+  }
+
+  /**
+   * insert the data of several deivces. Given a deivce, for each timestamp, 
the number of
+   * measurements is the same.
+   * <p>
+   * Times in each Tablet may not be in ascending order
+   *
+   * @param tablets data batch in multiple device
+   */
+  public void insertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+
+  /**
+   * insert the data of several devices. Given a device, for each timestamp, 
the number of
+   * measurements is the same.
+   *
+   * @param tablets data batch in multiple device
+   * @param sorted  whether times in each Tablet are in ascending order
+   */
+  public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    if (Config.DEFAULT_CACHE_LEADER_MODE) {
+      insertTabletsWithLeaderCache(tablets, sorted);
+    } else {
+      TSInsertTabletsReq request = genTSInsertTabletsReq(new 
ArrayList<>(tablets.values()), sorted);
+      try {
+        defaultSessionConnection.insertTablets(request);
+      } catch (RedirectException ignored) {
+        // ignored
+      }
+    }
+  }
+
+  /**
+   * just fot test
+   */
+  public void insertTablets(TSInsertTabletsReq request) throws 
IoTDBConnectionException{
+    try {
+      defaultSessionConnection.insertTablets(request);
+    } catch (RedirectException ignored) {
+      // ignored
+    } catch (StatementExecutionException e) {
+      e.printStackTrace();
+    }
+  }
+
+
+  private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, 
boolean sorted) throws
+      IoTDBConnectionException, StatementExecutionException {
+    EndPoint endPoint;
+    SessionConnection connection;
+    Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>();
+    for (Entry<String, Tablet> entry : tablets.entrySet()) {
+      endPoint = deviceIdToEndpoint.get(entry.getKey());
+      if (endPoint != null) {
+        connection = endPointToSessionConnection.get(endPoint);
+      } else {
+        connection = defaultSessionConnection;
+      }
+      TSInsertTabletsReq request = tabletGroup
+          .computeIfAbsent(connection, k -> new TSInsertTabletsReq());
+      updateTSInsertTabletsReq(request, entry.getValue(), sorted);
+    }
+
+    //TODO parallel
+    StringBuilder errMsgBuilder = new StringBuilder();
+    for (Entry<SessionConnection, TSInsertTabletsReq> entry : 
tabletGroup.entrySet()) {
+      try {
+        entry.getKey().insertTablets(entry.getValue());
+      } catch (RedirectException e) {
+        for (Entry<String, EndPoint> deviceEndPointEntry : 
e.getDeviceEndPointMap().entrySet()) {
+          handleRedirection(deviceEndPointEntry.getKey(), 
deviceEndPointEntry.getValue());
+        }
+      } catch (StatementExecutionException e) {
+        errMsgBuilder.append(e.getMessage());
+      }
+    }
+    String errMsg = errMsgBuilder.toString();
+    if (!errMsg.isEmpty()) {
+      throw new StatementExecutionException(errMsg);
+    }
+  }
+
+  private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, 
boolean sorted)
+      throws BatchExecutionException {
+    TSInsertTabletsReq request = new TSInsertTabletsReq();
+
+    for (Tablet tablet : tablets) {
+      updateTSInsertTabletsReq(request, tablet, sorted);
+    }
+    return request;
+  }
+
+  private void updateTSInsertTabletsReq(TSInsertTabletsReq request, Tablet 
tablet, boolean sorted)
+      throws BatchExecutionException {
+    if (sorted) {
+      checkSortedThrowable(tablet);
+    } else {
+      sortTablet(tablet);
+    }
+
+    request.addToDeviceIds(tablet.deviceId);
+    List<String> measurements = new ArrayList<>();
+    List<Integer> dataTypes = new ArrayList<>();
+    for (MeasurementSchema measurementSchema : tablet.getSchemas()) {
+      measurements.add(measurementSchema.getMeasurementId());
+      dataTypes.add(measurementSchema.getType().ordinal());
+    }
+    request.addToMeasurementsList(measurements);
+    request.addToTypesList(dataTypes);
+    request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet));
+    request.addToValuesList(SessionUtils.getValueBuffer(tablet));
+    request.addToSizeList(tablet.rowSize);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    testInsertTablet(tablet, false);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted);
+    defaultSessionConnection.testInsertTablet(request);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    testInsertTablets(tablets, false);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertTabletsReq request = genTSInsertTabletsReq(new 
ArrayList<>(tablets.values()), sorted);
+    defaultSessionConnection.testInsertTablets(request);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, 
times,
+        measurementsList, valuesList);
+    defaultSessionConnection.testInsertRecords(request);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, 
measurementsList,
+        typesList, valuesList);
+    defaultSessionConnection.testInsertRecords(request);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+      List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, 
time, measurements,
+        values);
+    defaultSessionConnection.testInsertRecord(request);
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types, List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, 
measurements, types, values);
+    defaultSessionConnection.testInsertRecord(request);
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param path timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
+  }
+
+  /**
+   * delete some timeseries, including data and schema
+   *
+   * @param paths timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(List<String> paths)
+      throws IoTDBConnectionException, StatementExecutionException {
+    defaultSessionConnection.deleteTimeseries(paths);
+  }
+
+  /**
+   * delete data <= time in one timeseries
+   *
+   * @param path    data in which time series to delete
+   * @param endTime data with time stamp less than or equal to time will be 
deleted
+   */
+  public void deleteData(String path, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime);
+  }
+
+  /**
+   * delete data <= time in multiple timeseries
+   *
+   * @param paths   data in which time series to delete
+   * @param endTime data with time stamp less than or equal to time will be 
deleted
+   */
+  public void deleteData(List<String> paths, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    deleteData(paths, Long.MIN_VALUE, endTime);
+  }
+
+  /**
+   * delete data >= startTime and data <= endTime in multiple timeseries
+   *
+   * @param paths     data in which time series to delete
+   * @param startTime delete range start time
+   * @param endTime   delete range end time
+   */
+  public void deleteData(List<String> paths, long startTime, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
+    defaultSessionConnection.deleteData(request);
+  }
+
+  private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long 
startTime, long endTime) {
+    TSDeleteDataReq request = new TSDeleteDataReq();
+    request.setPaths(paths);
+    request.setStartTime(startTime);
+    request.setEndTime(endTime);
+    return request;
+  }
+
+  private int calculateLength(List<TSDataType> types, List<Object> values)
+      throws IoTDBConnectionException {
+    int res = 0;
+    for (int i = 0; i < types.size(); i++) {
+      // types
+      res += Short.BYTES;
+      switch (types.get(i)) {
+        case BOOLEAN:
+          res += 1;
+          break;
+        case INT32:
+          res += Integer.BYTES;
+          break;
+        case INT64:
+          res += Long.BYTES;
+          break;
+        case FLOAT:
+          res += Float.BYTES;
+          break;
+        case DOUBLE:
+          res += Double.BYTES;
+          break;
+        case TEXT:
+          res += Integer.BYTES;
+          res += ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length;
+          break;
+        default:
+          throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + 
types.get(i));
+      }
+    }
+    return res;
+  }
+
+  /**
+   * put value in buffer
+   *
+   * @param types  types list
+   * @param values values list
+   * @param buffer buffer to insert
+   * @throws IoTDBConnectionException
+   */
+  private void putValues(List<TSDataType> types, List<Object> values, 
ByteBuffer buffer)
+      throws IoTDBConnectionException {
+    for (int i = 0; i < values.size(); i++) {
+      ReadWriteIOUtils.write(types.get(i), buffer);
+      switch (types.get(i)) {
+        case BOOLEAN:
+          ReadWriteIOUtils.write((Boolean) values.get(i), buffer);
+          break;
+        case INT32:
+          ReadWriteIOUtils.write((Integer) values.get(i), buffer);
+          break;
+        case INT64:
+          ReadWriteIOUtils.write((Long) values.get(i), buffer);
+          break;
+        case FLOAT:
+          ReadWriteIOUtils.write((Float) values.get(i), buffer);
+          break;
+        case DOUBLE:
+          ReadWriteIOUtils.write((Double) values.get(i), buffer);
+          break;
+        case TEXT:
+          byte[] bytes = ((String) 
values.get(i)).getBytes(TSFileConfig.STRING_CHARSET);
+          ReadWriteIOUtils.write(bytes.length, buffer);
+          buffer.put(bytes);
+          break;
+        default:
+          throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + 
types.get(i));
+      }
+    }
+    buffer.flip();
+  }
+
+  /**
+   * check whether the batch has been sorted
+   *
+   * @return whether the batch has been sorted
+   */
+  private boolean checkSorted(Tablet tablet) {
+    for (int i = 1; i < tablet.rowSize; i++) {
+      if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void checkSortedThrowable(Tablet tablet) throws 
BatchExecutionException {
+    if (!checkSorted(tablet)) {
+      throw new BatchExecutionException("Times in Tablet are not in ascending 
order");
+    }
+  }
+
+  protected void sortTablet(Tablet tablet) {
+    /*
+     * following part of code sort the batch data by time,
+     * so we can insert continuous data in value list to get a better 
performance
+     */
+    // sort to get index, and use index to sort value list
+    Integer[] index = new Integer[tablet.rowSize];
+    for (int i = 0; i < tablet.rowSize; i++) {
+      index[i] = i;
+    }
+    Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
+    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      tablet.values[i] =
+          sortList(tablet.values[i], tablet.getSchemas().get(i).getType(), 
index);
+    }
+  }
+
+  /**
+   * sort value list by index
+   *
+   * @param valueList value list
+   * @param dataType  data type
+   * @param index     index
+   * @return sorted list
+   */
+  private Object sortList(Object valueList, TSDataType dataType, Integer[] 
index) {
+    switch (dataType) {
+      case BOOLEAN:
+        boolean[] boolValues = (boolean[]) valueList;
+        boolean[] sortedValues = new boolean[boolValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedValues[i] = boolValues[index[i]];
+        }
+        return sortedValues;
+      case INT32:
+        int[] intValues = (int[]) valueList;
+        int[] sortedIntValues = new int[intValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedIntValues[i] = intValues[index[i]];
+        }
+        return sortedIntValues;
+      case INT64:
+        long[] longValues = (long[]) valueList;
+        long[] sortedLongValues = new long[longValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedLongValues[i] = longValues[index[i]];
+        }
+        return sortedLongValues;
+      case FLOAT:
+        float[] floatValues = (float[]) valueList;
+        float[] sortedFloatValues = new float[floatValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedFloatValues[i] = floatValues[index[i]];
+        }
+        return sortedFloatValues;
+      case DOUBLE:
+        double[] doubleValues = (double[]) valueList;
+        double[] sortedDoubleValues = new double[doubleValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedDoubleValues[i] = doubleValues[index[i]];
+        }
+        return sortedDoubleValues;
+      case TEXT:
+        Binary[] binaryValues = (Binary[]) valueList;
+        Binary[] sortedBinaryValues = new Binary[binaryValues.length];
+        for (int i = 0; i < index.length; i++) {
+          sortedBinaryValues[i] = binaryValues[index[i]];
+        }
+        return sortedBinaryValues;
+      default:
+        throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + 
dataType);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java
 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java
new file mode 100644
index 0000000..91d7ac4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.serviceSession;
+
+import java.time.ZoneId;
+import java.util.List;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.EndPoint;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SessionConnection {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SessionConnection.class);
+  public static final String MSG_RECONNECTION_FAIL = "Fail to reconnect to 
server. Please check server status";
+  private Session session;
+  private TTransport transport;
+  private TSIService.Iface client;
+  private long sessionId;
+  private long statementId;
+  private ZoneId zoneId;
+  private EndPoint endPoint;
+
+  public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId)
+      throws IoTDBConnectionException {
+    this.session = session;
+    this.endPoint = endPoint;
+    this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
+    init(endPoint);
+  }
+
+  private void init(EndPoint endPoint) throws IoTDBConnectionException {
+    transport = RpcTransportFactory.INSTANCE.getTransport(
+        new TSocket(endPoint.getIp(), endPoint.getPort(), 
session.connectionTimeoutInMs));
+
+    try {
+      transport.open();
+    } catch (TTransportException e) {
+      throw new IoTDBConnectionException(e);
+    }
+
+    if (session.enableRPCCompression) {
+      client = new TSIService.Client(new TCompactProtocol(transport));
+    } else {
+      client = new TSIService.Client(new TBinaryProtocol(transport));
+    }
+    client = RpcUtils.newSynchronizedClient(client);
+
+    TSOpenSessionReq openReq = new TSOpenSessionReq();
+    openReq.setUsername(session.username);
+    openReq.setPassword(session.password);
+    openReq.setZoneId(zoneId.toString());
+
+    try {
+      TSOpenSessionResp openResp = client.openSession(openReq);
+
+      RpcUtils.verifySuccess(openResp.getStatus());
+
+      if (Session.protocolVersion.getValue() != 
openResp.getServerProtocolVersion().getValue()) {
+        logger.warn("Protocol differ, Client version is {}}, but Server 
version is {}",
+            Session.protocolVersion.getValue(), 
openResp.getServerProtocolVersion().getValue());
+        // less than 0.10
+        if (openResp.getServerProtocolVersion().getValue() == 0) {
+          throw new TException(String
+              .format("Protocol not supported, Client version is %s, but 
Server version is %s",
+                  Session.protocolVersion.getValue(),
+                  openResp.getServerProtocolVersion().getValue()));
+        }
+      }
+
+      sessionId = openResp.getSessionId();
+      statementId = client.requestStatementId(sessionId);
+
+    } catch (Exception e) {
+      transport.close();
+      throw new IoTDBConnectionException(e);
+    }
+  }
+
+
+  public void close() throws IoTDBConnectionException {
+    TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
+    try {
+      client.closeSession(req);
+    } catch (TException e) {
+      throw new IoTDBConnectionException(
+          "Error occurs when closing session at server. Maybe server is 
down.", e);
+    } finally {
+      if (transport != null) {
+        transport.close();
+      }
+    }
+  }
+
+  protected void setTimeZone(String zoneId)
+      throws StatementExecutionException, IoTDBConnectionException {
+    TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
+    TSStatus resp;
+    try {
+      resp = client.setTimeZone(req);
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          req.setSessionId(sessionId);
+          resp = client.setTimeZone(req);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+    RpcUtils.verifySuccess(resp);
+    this.zoneId = ZoneId.of(zoneId);
+  }
+
+  protected String getTimeZone() {
+    if (zoneId == null) {
+      zoneId = ZoneId.systemDefault();
+    }
+    return zoneId.toString();
+  }
+
+  protected void setStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.setStorageGroup(sessionId, 
storageGroup));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroup));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void deleteStorageGroups(List<String> storageGroups)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    try {
+      
RpcUtils.verifySuccessWithRedirection(client.deleteStorageGroups(sessionId, 
storageGroups));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroups));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void createTimeseries(TSCreateTimeseriesReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.createTimeseries(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.createTimeseries(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.createMultiTimeseries(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.createMultiTimeseries(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected boolean checkTimeseriesExists(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    SessionDataSet dataSet = null;
+    try {
+      dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", 
path));
+      return dataSet.hasNext();
+    } finally {
+      if (dataSet != null) {
+        dataSet.closeOperationHandle();
+      }
+    }
+  }
+
+  protected SessionDataSet executeQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
+    execReq.setFetchSize(session.fetchSize);
+    TSExecuteStatementResp execResp;
+    try {
+      execResp = client.executeQueryStatement(execReq);
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          execReq.setSessionId(sessionId);
+          execReq.setStatementId(statementId);
+          execResp = client.executeQueryStatement(execReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+
+    RpcUtils.verifySuccess(execResp.getStatus());
+    return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(),
+        execResp.columnNameIndexMap,
+        execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
+        execResp.isIgnoreTimeStamp());
+  }
+
+  protected void executeNonQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
+    try {
+      TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
+      RpcUtils.verifySuccess(execResp.getStatus());
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          execReq.setSessionId(sessionId);
+          execReq.setStatementId(statementId);
+          
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).status);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected SessionDataSet executeRawDataQuery(List<String> paths, long 
startTime, long endTime)
+      throws StatementExecutionException, IoTDBConnectionException {
+    TSRawDataQueryReq execReq = new TSRawDataQueryReq(sessionId, paths, 
startTime, endTime,
+        statementId);
+    execReq.setFetchSize(session.fetchSize);
+    TSExecuteStatementResp execResp;
+    try {
+      execResp = client.executeRawDataQuery(execReq);
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          execReq.setSessionId(sessionId);
+          execReq.setStatementId(statementId);
+          execResp = client.executeRawDataQuery(execReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+
+    RpcUtils.verifySuccess(execResp.getStatus());
+    return new SessionDataSet("", execResp.getColumns(), 
execResp.getDataTypeList(),
+        execResp.columnNameIndexMap,
+        execResp.getQueryId(), client, sessionId, execResp.queryDataSet,
+        execResp.isIgnoreTimeStamp());
+  }
+
+  protected void insertRecord(TSInsertRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.insertRecord(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void insertRecord(TSInsertStringRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      
RpcUtils.verifySuccessWithRedirection(client.insertStringRecord(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertStringRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void insertRecords(TSInsertRecordsReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.insertRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void insertRecords(TSInsertStringRecordsReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      
RpcUtils.verifySuccessWithRedirection(client.insertStringRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertStringRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void insertTablet(TSInsertTabletReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.insertTablet(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertTablet(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void insertTablets(TSInsertTabletsReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      
RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request),
 request);
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.insertTablets(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void deleteTimeseries(List<String> paths)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  public void deleteData(TSDeleteDataReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.deleteData(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.deleteData(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(
+            MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void testInsertRecord(TSInsertStringRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertStringRecord(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertStringRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void testInsertRecord(TSInsertRecordReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertRecord(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  public void testInsertRecords(TSInsertStringRecordsReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertStringRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertStringRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  public void testInsertRecords(TSInsertRecordsReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertRecords(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void testInsertTablet(TSInsertTabletReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertTablet(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertTablet(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  protected void testInsertTablets(TSInsertTabletsReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccess(client.testInsertTablets(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.testInsertTablets(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
+  private boolean reconnect() {
+    boolean flag = false;
+    for (int i = 1; i <= Config.RETRY_NUM; i++) {
+      try {
+        if (transport != null) {
+          close();
+          init(endPoint);
+          flag = true;
+        }
+      } catch (Exception e) {
+        try {
+          Thread.sleep(Config.RETRY_INTERVAL_MS);
+        } catch (InterruptedException e1) {
+          logger.error("reconnect is interrupted.", e1);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    return flag;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java
new file mode 100644
index 0000000..a2a0907
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession;
+
+import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.IoTDBRpcDataSet;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.thrift.TException;
+
+public class SessionDataSet {
+
+  private final IoTDBRpcDataSet ioTDBRpcDataSet;
+
+  public SessionDataSet(String sql, List<String> columnNameList, List<String> 
columnTypeList,
+      Map<String, Integer> columnNameIndex,
+      long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet 
queryDataSet,
+      boolean ignoreTimeStamp) {
+    this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, 
columnTypeList, columnNameIndex,
+        ignoreTimeStamp, queryId, client, sessionId, queryDataSet, 
Config.DEFAULT_FETCH_SIZE);
+  }
+
+  public int getFetchSize() {
+    return ioTDBRpcDataSet.fetchSize;
+  }
+
+  public void setFetchSize(int fetchSize) {
+    ioTDBRpcDataSet.fetchSize = fetchSize;
+  }
+
+  public List<String> getColumnNames() {
+    return new ArrayList<>(ioTDBRpcDataSet.columnNameList);
+  }
+
+  public List<TSDataType> getColumnTypes() {
+    return new ArrayList<>(ioTDBRpcDataSet.columnTypeList);
+  }
+
+
+  public boolean hasNext() throws StatementExecutionException, 
IoTDBConnectionException {
+    return ioTDBRpcDataSet.next();
+  }
+
+
+  private RowRecord constructRowRecordFromValueArray() throws 
StatementExecutionException {
+    List<Field> outFields = new ArrayList<>();
+    for (int i = 0; i < ioTDBRpcDataSet.columnSize; i++) {
+      Field field;
+
+      int index = i + 1;
+      int datasetColumnIndex = i + START_INDEX;
+      if (ioTDBRpcDataSet.ignoreTimeStamp) {
+        index--;
+        datasetColumnIndex--;
+      }
+      int loc =
+          
ioTDBRpcDataSet.columnOrdinalMap.get(ioTDBRpcDataSet.columnNameList.get(index))
+              - START_INDEX;
+
+      if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) {
+        byte[] valueBytes = ioTDBRpcDataSet.values[loc];
+        TSDataType dataType = 
ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc);
+        field = new Field(dataType);
+        switch (dataType) {
+          case BOOLEAN:
+            boolean booleanValue = BytesUtils.bytesToBool(valueBytes);
+            field.setBoolV(booleanValue);
+            break;
+          case INT32:
+            int intValue = BytesUtils.bytesToInt(valueBytes);
+            field.setIntV(intValue);
+            break;
+          case INT64:
+            long longValue = BytesUtils.bytesToLong(valueBytes);
+            field.setLongV(longValue);
+            break;
+          case FLOAT:
+            float floatValue = BytesUtils.bytesToFloat(valueBytes);
+            field.setFloatV(floatValue);
+            break;
+          case DOUBLE:
+            double doubleValue = BytesUtils.bytesToDouble(valueBytes);
+            field.setDoubleV(doubleValue);
+            break;
+          case TEXT:
+            field.setBinaryV(new Binary(valueBytes));
+            break;
+          default:
+            throw new UnSupportedDataTypeException(String
+                .format("Data type %s is not supported.",
+                    ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i)));
+        }
+      } else {
+        field = new Field(null);
+      }
+      outFields.add(field);
+    }
+    return new RowRecord(BytesUtils.bytesToLong(ioTDBRpcDataSet.time), 
outFields);
+  }
+
+
+  public RowRecord next() throws StatementExecutionException, 
IoTDBConnectionException {
+    if (!ioTDBRpcDataSet.hasCachedRecord && !hasNext()) {
+      return null;
+    }
+    ioTDBRpcDataSet.hasCachedRecord = false;
+
+    return constructRowRecordFromValueArray();
+  }
+
+  public void closeOperationHandle() throws StatementExecutionException, 
IoTDBConnectionException {
+    try {
+      ioTDBRpcDataSet.close();
+    } catch (TException e) {
+      throw new IoTDBConnectionException(e.getMessage());
+    }
+  }
+
+  public DataIterator iterator() {
+    return new DataIterator();
+  }
+
+  public class DataIterator {
+
+    public boolean next() throws StatementExecutionException, 
IoTDBConnectionException {
+      return ioTDBRpcDataSet.next();
+    }
+
+    public boolean isNull(int columnIndex) throws StatementExecutionException {
+      return ioTDBRpcDataSet.isNull(columnIndex);
+    }
+
+    public boolean isNull(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.isNull(columnName);
+    }
+
+    public boolean getBoolean(int columnIndex) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getBoolean(columnIndex);
+    }
+
+    public boolean getBoolean(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getBoolean(columnName);
+    }
+
+    public double getDouble(int columnIndex) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getDouble(columnIndex);
+    }
+
+    public double getDouble(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getDouble(columnName);
+    }
+
+    public float getFloat(int columnIndex) throws StatementExecutionException {
+      return ioTDBRpcDataSet.getFloat(columnIndex);
+    }
+
+    public float getFloat(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getFloat(columnName);
+    }
+
+    public int getInt(int columnIndex) throws StatementExecutionException {
+      return ioTDBRpcDataSet.getInt(columnIndex);
+    }
+
+    public int getInt(String columnName) throws StatementExecutionException {
+      return ioTDBRpcDataSet.getInt(columnName);
+    }
+
+    public long getLong(int columnIndex) throws StatementExecutionException {
+      return ioTDBRpcDataSet.getLong(columnIndex);
+    }
+
+    public long getLong(String columnName) throws StatementExecutionException {
+      return ioTDBRpcDataSet.getLong(columnName);
+    }
+
+    public Object getObject(int columnIndex) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getObject(columnIndex);
+    }
+
+    public Object getObject(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getObject(columnName);
+    }
+
+    public String getString(int columnIndex) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getString(columnIndex);
+    }
+
+    public String getString(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getString(columnName);
+    }
+
+    public Timestamp getTimestamp(int columnIndex) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getTimestamp(columnIndex);
+    }
+
+    public Timestamp getTimestamp(String columnName) throws 
StatementExecutionException {
+      return ioTDBRpcDataSet.getTimestamp(columnName);
+    }
+
+    public int findColumn(String columnName) {
+      return ioTDBRpcDataSet.findColumn(columnName);
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java
new file mode 100644
index 0000000..8e48bf1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession;
+
+import java.nio.ByteBuffer;
+import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.Binary;
+import org.apache.iotdb.tsfile.utils.BytesUtils;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+
+public class SessionUtils {
+
+  public static ByteBuffer getTimeBuffer(Tablet tablet) {
+    ByteBuffer timeBuffer = ByteBuffer.allocate(tablet.getTimeBytesSize());
+    for (int i = 0; i < tablet.rowSize; i++) {
+      timeBuffer.putLong(tablet.timestamps[i]);
+    }
+    timeBuffer.flip();
+    return timeBuffer;
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  public static ByteBuffer getValueBuffer(Tablet tablet) {
+    ByteBuffer valueBuffer = ByteBuffer.allocate(tablet.getValueBytesSize());
+    for (int i = 0; i < tablet.getSchemas().size(); i++) {
+      TSDataType dataType = tablet.getSchemas().get(i).getType();
+      switch (dataType) {
+        case INT32:
+          int[] intValues = (int[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.putInt(intValues[index]);
+          }
+          break;
+        case INT64:
+          long[] longValues = (long[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.putLong(longValues[index]);
+          }
+          break;
+        case FLOAT:
+          float[] floatValues = (float[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.putFloat(floatValues[index]);
+          }
+          break;
+        case DOUBLE:
+          double[] doubleValues = (double[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.putDouble(doubleValues[index]);
+          }
+          break;
+        case BOOLEAN:
+          boolean[] boolValues = (boolean[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.put(BytesUtils.boolToByte(boolValues[index]));
+          }
+          break;
+        case TEXT:
+          Binary[] binaryValues = (Binary[]) tablet.values[i];
+          for (int index = 0; index < tablet.rowSize; index++) {
+            valueBuffer.putInt(binaryValues[index].getLength());
+            valueBuffer.put(binaryValues[index].getValues());
+          }
+          break;
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Data type %s is not supported.", dataType));
+      }
+    }
+    valueBuffer.flip();
+    return valueBuffer;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java
 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java
new file mode 100644
index 0000000..e29bec6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession.pool;
+
+import java.util.List;
+import org.apache.iotdb.db.serviceSession.Session;
+import org.apache.iotdb.db.serviceSession.SessionDataSet;
+import org.apache.iotdb.db.serviceSession.SessionDataSet.DataIterator;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+
+public class SessionDataSetWrapper implements AutoCloseable{
+
+  SessionDataSet sessionDataSet;
+  Session session;
+  SessionPool pool;
+
+  public SessionDataSetWrapper(SessionDataSet sessionDataSet,
+      Session session, SessionPool pool) {
+    this.sessionDataSet = sessionDataSet;
+    this.session = session;
+    this.pool = pool;
+  }
+
+  protected Session getSession() {
+    return session;
+  }
+
+  public int getBatchSize() {
+    return sessionDataSet.getFetchSize();
+  }
+
+  public void setBatchSize(int batchSize) {
+    sessionDataSet.setFetchSize(batchSize);
+  }
+
+  /**
+   * If there is an Exception, and you do not want to use the resultset 
anymore,
+   * you have to release the resultset manually by calling closeResultSet
+   * @return
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  public boolean hasNext() throws IoTDBConnectionException, 
StatementExecutionException {
+      boolean next = sessionDataSet.hasNext();
+      if (!next) {
+        pool.closeResultSet(this);
+      }
+      return next;
+  }
+  /**
+   * If there is an Exception, and you do not want to use the resultset 
anymore,
+   * you have to release the resultset manually by calling closeResultSet
+   * @return
+   * @throws IoTDBConnectionException
+   * @throws StatementExecutionException
+   */
+  public RowRecord next() throws IoTDBConnectionException, 
StatementExecutionException {
+    return sessionDataSet.next();
+  }
+
+  /**
+   * retrieve data set like jdbc
+   */
+  public DataIterator iterator() {
+    return sessionDataSet.iterator();
+  }
+
+  public List<String> getColumnNames() {
+    return sessionDataSet.getColumnNames();
+  }
+
+  public List<TSDataType> getColumnTypes() {
+    return sessionDataSet.getColumnTypes();
+  }
+
+  /**
+   * close this dataset to release the session
+   */
+  public void close() {
+    pool.closeResultSet(this);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java
new file mode 100644
index 0000000..a69fed1
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java
@@ -0,0 +1,983 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.serviceSession.pool;
+
+import java.time.ZoneId;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.iotdb.db.serviceSession.Config;
+import org.apache.iotdb.db.serviceSession.Session;
+import org.apache.iotdb.db.serviceSession.SessionDataSet;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do 
not need to consider
+ * how to reuse a session connection. Even if the session is disconnected, the 
session pool can
+ * recognize it and remove the broken session connection and create a new one.
+ * <p>
+ * If there is no available connections and the pool reaches its max size, the 
all methods will hang
+ * until there is a available connection.
+ * <p>
+ * If a user has waited for a session for more than 60 seconds, a warn log 
will be printed.
+ * <p>
+ * The only thing you have to remember is that:
+ * <p>
+ * For a query, if you have get all data, i.e., 
SessionDataSetWrapper.hasNext() == false, it is ok.
+ * Otherwise, i.e., you want to stop the query before you get all data
+ * (SessionDataSetWrapper.hasNext() == true), then you have to call 
closeResultSet(SessionDataSetWrapper
+ * wrapper) manually. Otherwise the connection is occupied by the query.
+ * <p>
+ * Another case that you have to manually call closeResultSet() is that when 
there is exception when
+ * you call SessionDataSetWrapper.hasNext() or next()
+ */
+public class SessionPool {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SessionPool.class);
+  public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed";
+  public static final String CLOSE_THE_SESSION_FAILED = "close the session 
failed.";
+  private static int RETRY = 3;
+  private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>();
+  //for session whose resultSet is not released.
+  private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>();
+  private int size = 0;
+  private int maxSize = 0;
+  private String ip;
+  private int port;
+  private String user;
+  private String password;
+  private int fetchSize;
+  private long timeout; //ms
+  private static int FINAL_RETRY = RETRY - 1;
+  private boolean enableCompression = false;
+  private ZoneId zoneId;
+
+  private boolean closed;//whether the queue is closed.
+
+  public SessionPool(String ip, int port, String user, String password, int 
maxSize) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, 
false, null);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int 
maxSize,
+      boolean enableCompression) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, 
enableCompression,
+        null);
+  }
+
+  public SessionPool(String ip, int port, String user, String password, int 
maxSize,
+      ZoneId zoneId) {
+    this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, 
false, zoneId);
+  }
+
+  @SuppressWarnings("squid:S107")
+  public SessionPool(String ip, int port, String user, String password, int 
maxSize, int fetchSize,
+      long timeout, boolean enableCompression, ZoneId zoneId) {
+    this.maxSize = maxSize;
+    this.ip = ip;
+    this.port = port;
+    this.user = user;
+    this.password = password;
+    this.fetchSize = fetchSize;
+    this.timeout = timeout;
+    this.enableCompression = enableCompression;
+    this.zoneId = zoneId;
+  }
+
+  //if this method throws an exception, either the server is broken, or the 
ip/port/user/password is incorrect.
+
+  @SuppressWarnings({"squid:S3776","squid:S2446"}) // Suppress high Cognitive 
Complexity warning
+  private Session getSession() throws IoTDBConnectionException {
+    Session session = queue.poll();
+    if (closed) {
+      throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+    }
+    if (session != null) {
+      return session;
+    } else {
+      long start = System.currentTimeMillis();
+      boolean canCreate = false;
+      synchronized (this) {
+        if (size < maxSize) {
+          //we can create more session
+          size++;
+          canCreate = true;
+          //but we do it after skip synchronized block because connection a 
session is time consuming.
+        }
+      }
+      if (canCreate) {
+        //create a new one.
+        if (logger.isDebugEnabled()) {
+          logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, 
password);
+        }
+        session = new Session(ip, port, user, password, fetchSize, zoneId);
+        try {
+          session.open(enableCompression);
+          //avoid someone has called close() the session pool
+          synchronized (this) {
+            if (closed) {
+              //have to release the connection...
+              session.close();
+              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+            } else {
+              return session;
+            }
+          }
+        } catch (IoTDBConnectionException e) {
+          //if exception, we will throw the exception.
+          //Meanwhile, we have to set size--
+          synchronized (this) {
+            size--;
+            //we do not need to notifyAll as any waited thread can continue to 
work after waked up.
+            this.notify();
+            if (logger.isDebugEnabled()) {
+              logger.debug("open session failed, reduce the count and notify 
others...");
+            }
+          }
+          throw e;
+        }
+      }
+      else {
+        while (session == null) {
+          synchronized (this) {
+            if (closed) {
+              throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED);
+            }
+            //we have to wait for someone returns a session.
+            try {
+              if (logger.isDebugEnabled()) {
+                logger.debug("no more sessions can be created, wait... 
queue.size={}", queue.size());
+              }
+              this.wait(1000);
+              long time = timeout < 60_000 ? timeout : 60_000;
+              if (System.currentTimeMillis() - start > time) {
+                logger.warn(
+                    "the SessionPool has wait for {} seconds to get a new 
connection: {}:{} with {}, {}",
+                    (System.currentTimeMillis() - start) / 1000, ip, port, 
user, password);
+                logger.warn("current occupied size {}, queue size {}, 
considered size {} ",
+                    occupied.size(), queue.size(), size);
+                if (System.currentTimeMillis() - start > timeout) {
+                  throw new IoTDBConnectionException(
+                      String.format("timeout to get a connection from %s:%s", 
ip, port));
+                }
+              }
+            } catch (InterruptedException e) {
+              logger.error("the SessionPool is damaged", e);
+              Thread.currentThread().interrupt();
+            }
+            session = queue.poll();
+          }
+        }
+        return session;
+      }
+    }
+  }
+
+  public int currentAvailableSize() {
+    return queue.size();
+  }
+
+  public int currentOccupiedSize() {
+    return occupied.size();
+  }
+
+  @SuppressWarnings({"squid:S2446"})
+  private void putBack(Session session) {
+    queue.push(session);
+    synchronized (this) {
+      //we do not need to notifyAll as any waited thread can continue to work 
after waked up.
+      this.notify();
+      //comment the following codes as putBack is too frequently called.
+//      if (logger.isTraceEnabled()) {
+//        logger.trace("put a session back and notify others..., queue.size = 
{}", queue.size());
+//      }
+    }
+  }
+
+  private void occupy(Session session) {
+    occupied.put(session, session);
+  }
+
+  /**
+   * close all connections in the pool
+   */
+  public synchronized void close() {
+    for (Session session : queue) {
+      try {
+        session.close();
+      } catch (IoTDBConnectionException e) {
+        //do nothing
+        logger.warn(CLOSE_THE_SESSION_FAILED, e);
+      }
+    }
+    for (Session session : occupied.keySet()) {
+      try {
+        session.close();
+      } catch (IoTDBConnectionException e) {
+        //do nothing
+        logger.warn(CLOSE_THE_SESSION_FAILED, e);
+      }
+    }
+    logger.info("closing the session pool, cleaning queues...");
+    this.closed = true;
+    queue.clear();
+    occupied.clear();
+  }
+
+  public void closeResultSet(SessionDataSetWrapper wrapper) {
+    boolean putback = true;
+    try {
+      wrapper.sessionDataSet.closeOperationHandle();
+    } catch (IoTDBConnectionException | StatementExecutionException e) {
+      removeSession();
+      putback = false;
+    } finally {
+      Session session = occupied.remove(wrapper.session);
+      if (putback && session != null) {
+        putBack(wrapper.session);
+      }
+    }
+  }
+
+  @SuppressWarnings({"squid:S2446"})
+  private synchronized void removeSession() {
+    logger.warn("Remove a broken Session {}, {}, {}", ip, port, user);
+    size--;
+    //we do not need to notifyAll as any waited thread can continue to work 
after waked up.
+    this.notify();
+    if (logger.isDebugEnabled()) {
+        logger.debug("remove a broken session and notify others..., queue.size 
= {}", queue.size());
+    }
+  }
+
+  private void closeSession(Session session) {
+    if (session != null) {
+      try {
+        session.close();
+      } catch (Exception e2) {
+        //do nothing. We just want to guarantee the session is closed.
+        logger.warn(CLOSE_THE_SESSION_FAILED, e2);
+      }
+    }
+  }
+
+  private void cleanSessionAndMayThrowConnectionException(Session session, int 
times,
+      IoTDBConnectionException e) throws IoTDBConnectionException {
+    closeSession(session);
+    removeSession();
+    if (times == FINAL_RETRY) {
+      throw new IoTDBConnectionException(
+          String.format("retry to execute statement on %s:%s failed %d times: 
%s", ip, port,
+              RETRY, e.getMessage()), e);
+    }
+  }
+
+  /**
+   * insert the data of a device. For each timestamp, the number of 
measurements is the same.
+   *
+   *  a Tablet example:
+   *
+   *        device1
+   *     time s1, s2, s3
+   *     1,   1,  1,  1
+   *     2,   2,  2,  2
+   *     3,   3,  3,  3
+   *
+   * times in Tablet may be not in ascending order
+   *
+   * @param tablet data batch
+   */
+  public void insertTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablet(tablet, false);
+  }
+
+  /**
+   * insert the data of a device. For each timestamp, the number of 
measurements is the same.
+   *
+   * a Tablet example:
+   *
+   *      device1
+   * time s1, s2, s3
+   * 1,   1,  1,  1
+   * 2,   2,  2,  2
+   * 3,   3,  3,  3
+   *
+   * Users need to control the count of Tablet and write a batch when it 
reaches the maxBatchSize
+   *
+   * @param tablet a tablet data of one device
+   * @param sorted whether times in Tablet are in ascending order
+   */
+  public void insertTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertTablet(tablet, sorted);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertTablet failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param tablets multiple batch
+   */
+  public void insertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertTablets(tablets, false);
+  }
+
+  /**
+   * use batch interface to insert data
+   *
+   * @param tablets multiple batch
+   */
+  public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertTablets(tablets, sorted);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertTablets failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * use batch interface to insert data
+   */
+  public void insertTablets(TSInsertTabletsReq req)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertTablets(req);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertTablets failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Insert data in batch format, which can reduce the overhead of network. 
This method is just like
+   * jdbc batch insert, we pack some insert request in batch and send them to 
server If you want
+   * improve your performance, please see insertTablet method
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList) throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+
+  /**
+   * Insert data in batch format, which can reduce the overhead of network. 
This method is just like
+   * jdbc batch insert, we pack some insert request in batch and send them to 
server If you want
+   * improve your performance, please see insertTablet method
+   *
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecords(deviceIds, times, measurementsList, valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use 
insertRecords method
+   * or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types, List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecord(deviceId, time, measurements, types, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertRecord failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * insert data in one row, if you want improve your performance, please use 
insertRecords method
+   * or insertTablet method
+   *
+   * @see Session#insertRecords(List, List, List, List, List)
+   * @see Session#insertTablet(Tablet)
+   */
+  public void insertRecord(String deviceId, long time, List<String> 
measurements,
+      List<String> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.insertRecord(deviceId, time, measurements, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("insertRecord failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertTablet(tablet);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertTablet failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertTablets(Map<String, Tablet> tablets)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertTablets(tablets);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertTablets failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<String>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertRecords(deviceIds, times, measurementsList, 
valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecords(List<String> deviceIds, List<Long> times,
+      List<List<String>> measurementsList, List<List<TSDataType>> typesList,
+      List<List<Object>> valuesList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertRecords(deviceIds, times, measurementsList, 
typesList, valuesList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertRecords failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+      List<String> values) throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertRecord(deviceId, time, measurements, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertRecord failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  public void testInsertRecord(String deviceId, long time, List<String> 
measurements,
+      List<TSDataType> types, List<Object> values)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.testInsertRecord(deviceId, time, measurements, types, values);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("testInsertRecord failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param path timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteTimeseries(path);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteTimeseries failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * delete a timeseries, including data and schema
+   *
+   * @param paths timeseries to delete, should be a whole path
+   */
+  public void deleteTimeseries(List<String> paths)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteTimeseries(paths);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteTimeseries failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in one timeseries
+   *
+   * @param path data in which time series to delete
+   * @param time data with time stamp less than or equal to time will be 
deleted
+   */
+  public void deleteData(String path, long time)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteData(path, time);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteData failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * delete data <= time in multiple timeseries
+   *
+   * @param paths data in which time series to delete
+   * @param time  data with time stamp less than or equal to time will be 
deleted
+   */
+  public void deleteData(List<String> paths, long time)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteData(paths, time);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteData failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * delete data >= startTime and data <= endTime in multiple timeseries
+   *
+   * @param paths     data in which time series to delete
+   * @param startTime delete range start time
+   * @param endTime   delete range end time
+   */
+  public void deleteData(List<String> paths, long startTime, long endTime)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteData(paths, startTime, endTime);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteData failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void setStorageGroup(String storageGroupId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.setStorageGroup(storageGroupId);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("setStorageGroup failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void deleteStorageGroup(String storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteStorageGroup(storageGroup);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteStorageGroup failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void deleteStorageGroups(List<String> storageGroup)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.deleteStorageGroups(storageGroup);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("deleteStorageGroups failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void createTimeseries(String path, TSDataType dataType, TSEncoding 
encoding,
+      CompressionType compressor) throws IoTDBConnectionException, 
StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.createTimeseries(path, dataType, encoding, compressor);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("createTimeseries failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void createTimeseries(String path, TSDataType dataType, TSEncoding 
encoding,
+      CompressionType compressor, Map<String, String> props, Map<String, 
String> tags,
+      Map<String, String> attributes, String measurementAlias)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.createTimeseries(path, dataType, encoding, compressor, props, 
tags, attributes,
+            measurementAlias);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("createTimeseries failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public void createMultiTimeseries(List<String> paths, List<TSDataType> 
dataTypes,
+      List<TSEncoding> encodings, List<CompressionType> compressors,
+      List<Map<String, String>> propsList, List<Map<String, String>> tagsList,
+      List<Map<String, String>> attributesList, List<String> 
measurementAliasList)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.createMultiTimeseries(paths, dataTypes, encodings, 
compressors, propsList, tagsList,
+            attributesList, measurementAliasList);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("createMultiTimeseries failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+
+  public boolean checkTimeseriesExists(String path)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        boolean resp = session.checkTimeseriesExists(path);
+        putBack(session);
+        return resp;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("checkTimeseriesExists failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    //never go here.
+    return false;
+  }
+
+  /**
+   * execure query sql users must call closeResultSet(SessionDataSetWrapper) 
if they do not use the
+   * SessionDataSet any more. users do not need to call 
sessionDataSet.closeOpeationHandler() any
+   * more.
+   *
+   * @param sql query statement
+   * @return result set Notice that you must get the result instance. 
Otherwise a data leakage will
+   * happen
+   */
+  public SessionDataSetWrapper executeQueryStatement(String sql)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        SessionDataSet resp = session.executeQueryStatement(sql);
+        SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, 
session, this);
+        occupy(session);
+        return wrapper;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("executeQueryStatement failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+    // never go here
+    return null;
+  }
+
+  /**
+   * execute non query statement
+   *
+   * @param sql non query statement
+   */
+  public void executeNonQueryStatement(String sql)
+      throws StatementExecutionException, IoTDBConnectionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        session.executeNonQueryStatement(sql);
+        putBack(session);
+        return;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new 
one.
+        logger.warn("executeNonQueryStatement failed", e);
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java
new file mode 100644
index 0000000..a9511a4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Notice that, all test begins with "IoTDB" is integration test. All test 
which will start the
+ * IoTDB server should be defined as integration test.
+ */
+public class IoTDBMultiDeviceIT {
+
+  private static TSFileConfig tsFileConfig = 
TSFileDescriptor.getInstance().getConfig();
+  private static int maxNumberOfPointsInPage;
+  private static int pageSizeInByte;
+  private static int groupSizeInByte;
+  private static long prevPartitionInterval;
+
+  @Before
+  public void setUp() throws Exception {
+
+    EnvironmentUtils.closeStatMonitor();
+    IoTDBDescriptor.getInstance().getConfig()
+        .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+    // use small page setting
+    // origin value
+    maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage();
+    pageSizeInByte = tsFileConfig.getPageSizeInByte();
+    groupSizeInByte = tsFileConfig.getGroupSizeInByte();
+
+    // new value
+    tsFileConfig.setMaxNumberOfPointsInPage(1000);
+    tsFileConfig.setPageSizeInByte(1024 * 150);
+    tsFileConfig.setGroupSizeInByte(1024 * 1000);
+    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 
1000);
+    prevPartitionInterval = 
IoTDBDescriptor.getInstance().getConfig().getPartitionInterval();
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(100);
+    TSFileDescriptor.getInstance().getConfig().setCompressor("LZ4");
+
+    EnvironmentUtils.envSetUp();
+
+    insertData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    // recovery value
+    tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
+    tsFileConfig.setPageSizeInByte(pageSizeInByte);
+    tsFileConfig.setGroupSizeInByte(groupSizeInByte);
+    EnvironmentUtils.cleanEnv();
+    
IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval);
+    
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+    TSFileDescriptor.getInstance().getConfig().setCompressor("SNAPPY");
+    IoTDBDescriptor.getInstance().getConfig()
+        .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+  }
+
+  private static void insertData()
+      throws ClassNotFoundException, SQLException {
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+        Statement statement = connection.createStatement()) {
+
+      for (String sql : TestConstant.create_sql) {
+        statement.execute(sql);
+      }
+
+      statement.execute("SET STORAGE GROUP TO root.fans");
+      statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH 
DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.fans.d1.s0 WITH 
DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.fans.d2.s0 WITH 
DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.fans.d3.s0 WITH 
DATATYPE=INT32, ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.car.d0.s1 WITH DATATYPE=INT64, 
ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.car.d1.s1 WITH DATATYPE=INT64, 
ENCODING=RLE");
+      statement.execute("CREATE TIMESERIES root.car.d2.s1 WITH DATATYPE=INT64, 
ENCODING=RLE");
+
+
+
+      // insert of data time range :0-1000 into fans
+      for (int time = 0; time < 1000; time++) {
+
+        String sql = String
+            .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+      }
+
+      // insert large amount of data time range : 13700 ~ 24000
+      for (int time = 13700; time < 24000; time++) {
+
+        String sql = String
+            .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+      }
+
+      // insert large amount of data time range : 3000 ~ 13600
+      for (int time = 3000; time < 13600; time++) {
+        // System.out.println("===" + time);
+        String sql = String
+            .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+      }
+
+      statement.execute("flush");
+      statement.execute("merge");
+
+      // unsequential data, memory data
+      for (int time = 10000; time < 11000; time++) {
+
+        String sql = String
+            .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+      }
+
+      // sequential data, memory data
+      for (int time = 200000; time < 201000; time++) {
+
+        String sql = String
+            .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", 
time, time % 70);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+        sql = String
+            .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", 
time, time % 40);
+        statement.execute(sql);
+      }
+
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // "select * from root.vehicle" : test select wild data
+  @Test
+  public void selectAllTest() throws ClassNotFoundException {
+    String selectSql = "select * from root";
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+        Statement statement = connection.createStatement()) {
+      boolean hasResultSet = statement.execute(selectSql);
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int cnt = 0;
+        long before = -1;
+        while (resultSet.next()) {
+          long cur = 
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+          if(cur <= before){
+            fail("time order wrong!");
+          }
+          before = cur;
+          cnt++;
+        }
+        assertEquals(22900, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // "select * from root.vehicle" : test select wild data
+  @Test
+  public void selectAfterDeleteTest() throws ClassNotFoundException {
+    String selectSql = "select * from root";
+
+    Class.forName(Config.JDBC_DRIVER_NAME);
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", 
"root");
+        Statement statement = connection.createStatement()) {
+
+      statement.execute("DELETE FROM root.fans.* WHERE time <= 1000");
+      statement.execute("DELETE FROM root.car.* WHERE time <= 1000");
+      statement.execute("DELETE FROM root.fans.* WHERE time >= 200500 and time 
< 201000");
+      statement.execute("DELETE FROM root.car.* WHERE time >= 200500 and time 
< 201000");
+
+
+      boolean hasResultSet = statement.execute(selectSql);
+      Assert.assertTrue(hasResultSet);
+      try (ResultSet resultSet = statement.getResultSet()) {
+        int cnt = 0;
+        long before = -1;
+        while (resultSet.next()) {
+          long cur = 
Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR));
+          if(cur <= before){
+            fail("time order wrong!");
+          }
+          before = cur;
+          cnt++;
+        }
+        assertEquals(21400, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+}
diff --git a/thrift/src/main/thrift/rpc.thrift 
b/thrift/src/main/thrift/rpc.thrift
index 7f29bbd..827cc3a 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -221,6 +221,7 @@ struct TSInsertTabletsReq {
   5: required list<binary> timestampsList
   6: required list<list<i32>> typesList
   7: required list<i32> sizeList
+  8: optional bool isFinal
 }
 
 struct TSInsertRecordsReq {

Reply via email to