This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98310822b32 Session insert request won't fail while rolling upgrade
98310822b32 is described below

commit 98310822b32ac95bc693f014715d46c6687ee3f7
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jan 28 10:26:04 2024 +0800

    Session insert request won't fail while rolling upgrade
---
 .../org/apache/iotdb/isession/SessionConfig.java   |    4 +
 .../java/org/apache/iotdb/session/Session.java     |   27 +-
 .../apache/iotdb/session/SessionConnection.java    |  660 ++++++++--
 .../org/apache/iotdb/session/pool/SessionPool.java | 1289 +++++++++-----------
 .../iotdb/session/SessionConnectionTest.java       |    9 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |   11 +-
 .../plan/scheduler/AsyncSendPlanNodeHandler.java   |    4 +-
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   38 +-
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |   32 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |    4 +
 .../apache/iotdb/commons/utils/StatusUtils.java    |   52 +
 .../thrift-commons/src/main/thrift/common.thrift   |    1 +
 12 files changed, 1289 insertions(+), 842 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
index 8004201ebeb..ac14a99c80f 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
@@ -52,5 +52,9 @@ public class SessionConfig {
 
   public static final boolean DEFAULT_ENABLE_AUTO_FETCH = true;
 
+  public static final int MAX_RETRY_COUNT = 60;
+
+  public static final long RETRY_INTERVAL_IN_MS = 500;
+
   private SessionConfig() {}
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index b05514cf821..4d0ad33efd7 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -162,6 +162,10 @@ public class Session implements ISession {
   // default enable
   protected boolean enableAutoFetch = true;
 
+  protected int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+  protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
   private static final String REDIRECT_TWICE = "redirect twice";
 
   private static final String REDIRECT_TWICE_RETRY = "redirect twice, please 
try again.";
@@ -421,6 +425,8 @@ public class Session implements ISession {
     this.trustStore = builder.trustStore;
     this.trustStorePwd = builder.trustStorePwd;
     this.enableAutoFetch = builder.enableAutoFetch;
+    this.maxRetryCount = builder.maxRetryCount;
+    this.retryIntervalInMs = builder.retryIntervalInMs;
   }
 
   @Override
@@ -580,9 +586,11 @@ public class Session implements ISession {
   public SessionConnection constructSessionConnection(
       Session session, TEndPoint endpoint, ZoneId zoneId) throws 
IoTDBConnectionException {
     if (endpoint == null) {
-      return new SessionConnection(session, zoneId, availableNodes);
+      return new SessionConnection(
+          session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
     }
-    return new SessionConnection(session, endpoint, zoneId, availableNodes);
+    return new SessionConnection(
+        session, endpoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs);
   }
 
   @Override
@@ -1261,7 +1269,6 @@ public class Session implements ISession {
               });
       if (connection == null) {
         deviceIdToEndpoint.remove(deviceId);
-        logger.warn("Can not redirect to {}, because session can not connect 
to it.", endpoint);
       }
     }
   }
@@ -3551,6 +3558,10 @@ public class Session implements ISession {
     private String trustStore;
     private String trustStorePwd;
 
+    private int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+    private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
     public Builder useSSL(boolean useSSL) {
       this.useSSL = useSSL;
       return this;
@@ -3633,6 +3644,16 @@ public class Session implements ISession {
       return this;
     }
 
+    public Builder maxRetryCount(int maxRetryCount) {
+      this.maxRetryCount = maxRetryCount;
+      return this;
+    }
+
+    public Builder retryIntervalInMs(long retryIntervalInMs) {
+      this.retryIntervalInMs = retryIntervalInMs;
+      return this;
+    }
+
     public Session build() {
       if (nodeUrls != null
           && (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort != 
SessionConfig.DEFAULT_PORT)) {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index fdda3c31fb7..6cb618e8fa9 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -80,6 +80,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
 public class SessionConnection {
@@ -99,19 +100,32 @@ public class SessionConnection {
 
   private final Supplier<List<TEndPoint>> availableNodes;
 
+  private final int maxRetryCount;
+
+  private final long retryIntervalInMs;
+
   // TestOnly
   public SessionConnection() {
     availableNodes = Collections::emptyList;
+    this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT);
+    this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS);
   }
 
   public SessionConnection(
-      Session session, TEndPoint endPoint, ZoneId zoneId, 
Supplier<List<TEndPoint>> availableNodes)
+      Session session,
+      TEndPoint endPoint,
+      ZoneId zoneId,
+      Supplier<List<TEndPoint>> availableNodes,
+      int maxRetryCount,
+      long retryIntervalInMs)
       throws IoTDBConnectionException {
     this.session = session;
     this.endPoint = endPoint;
     endPointList.add(endPoint);
     this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
     this.availableNodes = availableNodes;
+    this.maxRetryCount = Math.max(0, maxRetryCount);
+    this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
     try {
       init(endPoint, session.useSSL, session.trustStore, 
session.trustStorePwd);
     } catch (IoTDBConnectionException e) {
@@ -119,12 +133,19 @@ public class SessionConnection {
     }
   }
 
-  public SessionConnection(Session session, ZoneId zoneId, 
Supplier<List<TEndPoint>> availableNodes)
+  public SessionConnection(
+      Session session,
+      ZoneId zoneId,
+      Supplier<List<TEndPoint>> availableNodes,
+      int maxRetryCount,
+      long retryIntervalInMs)
       throws IoTDBConnectionException {
     this.session = session;
     this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
     this.endPointList = SessionUtils.parseSeedNodeUrls(session.nodeUrls);
     this.availableNodes = availableNodes;
+    this.maxRetryCount = Math.max(0, maxRetryCount);
+    this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
     initClusterConn();
   }
 
@@ -414,24 +435,56 @@ public class SessionConnection {
 
   protected void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
-    TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
-    try {
-      execReq.setEnableRedirectQuery(enableRedirect);
-      TSExecuteStatementResp execResp = 
client.executeUpdateStatementV2(execReq);
-      RpcUtils.verifySuccess(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql, 
statementId);
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          execReq.setSessionId(sessionId);
-          execReq.setStatementId(statementId);
-          
RpcUtils.verifySuccess(client.executeUpdateStatementV2(execReq).status);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = executeNonQueryStatementInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        RpcUtils.verifySuccess(status);
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq 
request)
+      throws TException {
+    request.setSessionId(sessionId);
+    request.setStatementId(statementId);
+    return client.executeUpdateStatementV2(request).status;
   }
 
   protected SessionDataSet executeRawDataQuery(
@@ -654,193 +707,544 @@ public class SessionConnection {
 
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccessWithRedirection(client.insertRecord(request));
-    } catch (TException e) {
-      if (reconnect()) {
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertRecord(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
+        }
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
       }
+      try {
+        status = insertRecordInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirection(status);
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
+      }
+    }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
+  private TSStatus insertRecordInternal(TSInsertRecordReq request) throws 
TException {
+    request.setSessionId(sessionId);
+    return client.insertRecord(request);
+  }
+
   protected void insertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      
RpcUtils.verifySuccessWithRedirection(client.insertStringRecord(request));
-    } catch (TException e) {
-      if (reconnect()) {
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertStringRecord(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
+        }
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
       }
+      try {
+        status = insertRecordInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirection(status);
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
+      }
+    }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
+  private TSStatus insertRecordInternal(TSInsertStringRecordReq request) 
throws TException {
+    request.setSessionId(sessionId);
+    return client.insertStringRecord(request);
+  }
+
   protected void insertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
-          client.insertRecords(request), request.getPrefixPaths());
-    } catch (TException e) {
-      if (reconnect()) {
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertRecords(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
+        }
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+      }
+      try {
+        status = insertRecordsInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws 
TException {
+    request.setSessionId(sessionId);
+    return client.insertRecords(request);
   }
 
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
-          client.insertStringRecords(request), request.getPrefixPaths());
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertStringRecords(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
+        }
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
       }
+      try {
+        status = insertRecordsInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
+      }
+    }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
+  private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request) 
throws TException {
+    request.setSessionId(sessionId);
+    return client.insertStringRecords(request);
+  }
+
   protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq 
request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      
RpcUtils.verifySuccessWithRedirection(client.insertRecordsOfOneDevice(request));
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = insertRecordsOfOneDeviceInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirection(status);
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus 
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
+      throws TException {
+    request.setSessionId(sessionId);
+    return client.insertRecordsOfOneDevice(request);
   }
 
   protected void 
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      
RpcUtils.verifySuccessWithRedirection(client.insertStringRecordsOfOneDevice(request));
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          
RpcUtils.verifySuccess(client.insertStringRecordsOfOneDevice(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = insertStringRecordsOfOneDeviceInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirection(status);
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus insertStringRecordsOfOneDeviceInternal(
+      TSInsertStringRecordsOfOneDeviceReq request) throws TException {
+    request.setSessionId(sessionId);
+    return client.insertStringRecordsOfOneDevice(request);
   }
 
   protected void insertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccessWithRedirection(client.insertTablet(request));
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertTablet(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = insertTabletInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirection(status);
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus insertTabletInternal(TSInsertTabletReq request) throws 
TException {
+    request.setSessionId(sessionId);
+    return client.insertTablet(request);
   }
 
   protected void insertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccessWithRedirectionForMultiDevices(
-          client.insertTablets(request), request.getPrefixPaths());
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.insertTablets(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = insertTabletsInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        if (i == 0) {
+          // first time succeed, take account for redirection info
+          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
+        } else {
+          // if it's retry, just ignore redirection info
+          RpcUtils.verifySuccess(status);
+        }
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws 
TException {
+    request.setSessionId(sessionId);
+    return client.insertTablets(request);
   }
 
   protected void deleteTimeseries(List<String> paths)
       throws IoTDBConnectionException, StatementExecutionException {
-    try {
-      RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = client.deleteTimeseries(sessionId, paths);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        RpcUtils.verifySuccess(status);
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
   }
 
   public void deleteData(TSDeleteDataReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    request.setSessionId(sessionId);
-    try {
-      RpcUtils.verifySuccess(client.deleteData(request));
-    } catch (TException e) {
-      if (reconnect()) {
+
+    TException lastTException = null;
+    TSStatus status = null;
+    for (int i = 0; i <= maxRetryCount; i++) {
+      if (i > 0) {
+        // re-init the TException and TSStatus
+        lastTException = null;
+        status = null;
+        // not first time, we need to sleep and then reconnect
         try {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.deleteData(request));
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
+          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+        } catch (InterruptedException e) {
+          // just ignore
         }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+        if (!reconnect()) {
+          // reconnect failed, just continue to make another retry.
+          continue;
+        }
+      }
+      try {
+        status = deleteDataInternal(request);
+        // need retry
+        if (status.isSetNeedRetry() && status.isNeedRetry()) {
+          continue;
+        }
+        // succeed or don't need to retry
+        RpcUtils.verifySuccess(status);
+        return;
+      } catch (TException e) {
+        // all network exception need retry until reaching maxRetryCount
+        lastTException = e;
       }
     }
+
+    if (status != null) {
+      RpcUtils.verifySuccess(status);
+    } else if (lastTException != null) {
+      throw new IoTDBConnectionException(lastTException);
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
+    }
+  }
+
+  private TSStatus deleteDataInternal(TSDeleteDataReq request) throws 
TException {
+    request.setSessionId(sessionId);
+    return client.deleteData(request);
   }
 
   protected void testInsertRecord(TSInsertStringRecordReq request)
@@ -982,7 +1386,7 @@ public class SessionConnection {
             init(endPoint, session.useSSL, session.trustStore, 
session.trustStorePwd);
             connectedSuccess = true;
           } catch (IoTDBConnectionException e) {
-            logger.error("The current node may have been down {},try next 
node", endPoint);
+            logger.warn("The current node may have been down {},try next 
node", endPoint);
             continue;
           }
           break;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index ef3f5385738..aab6b118391 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -148,6 +148,10 @@ public class SessionPool implements ISessionPool {
 
   private boolean enableAutoFetch = true;
 
+  protected int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+  protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
   private static final String INSERT_RECORD_FAIL = "insertRecord failed";
 
   private static final String INSERT_RECORD_ERROR_MSG = "unexpected error in 
insertRecord";
@@ -479,6 +483,9 @@ public class SessionPool implements ISessionPool {
     this.useSSL = builder.useSSL;
     this.trustStore = builder.trustStore;
     this.trustStorePwd = builder.trustStorePwd;
+    this.maxRetryCount = builder.maxRetryCount;
+    this.retryIntervalInMs = builder.retryIntervalInMs;
+
     if (enableAutoFetch) {
       initThreadPool();
     }
@@ -529,6 +536,8 @@ public class SessionPool implements ISessionPool {
               .useSSL(useSSL)
               .trustStore(trustStore)
               .trustStorePwd(trustStorePwd)
+              .maxRetryCount(maxRetryCount)
+              .retryIntervalInMs(retryIntervalInMs)
               .build();
     } else {
       // Construct redirect-able Session
@@ -546,6 +555,8 @@ public class SessionPool implements ISessionPool {
               .useSSL(useSSL)
               .trustStore(trustStore)
               .trustStorePwd(trustStorePwd)
+              .maxRetryCount(maxRetryCount)
+              .retryIntervalInMs(retryIntervalInMs)
               .build();
     }
     session.setEnableQueryRedirection(enableQueryRedirection);
@@ -848,24 +859,21 @@ public class SessionPool implements ISessionPool {
      *     3,   3,  3,  3
      */
 
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertTablet(tablet, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertTablet failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertTablet", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertTablet(tablet, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertTablet failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertTablet", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -894,24 +902,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void insertAlignedTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedTablet(tablet, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedTablet failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedTablet", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedTablet(tablet, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedTablet failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedTablet", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -946,24 +951,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertTablets(tablets, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertTablets failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertTablets", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertTablets(tablets, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertTablets failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertTablets", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -976,24 +978,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedTablets(tablets, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedTablets failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedTablets", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedTablets(tablets, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedTablets failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedTablets", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1013,24 +1012,21 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1050,25 +1046,22 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecords(
-            multiSeriesIds, times, multiMeasurementComponentsList, typesList, 
valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecords", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecords(
+          multiSeriesIds, times, multiMeasurementComponentsList, typesList, 
valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecords", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1087,25 +1080,22 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, false);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, false);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1126,25 +1116,22 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, false);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, false);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1163,25 +1150,21 @@ public class SessionPool implements ISessionPool {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertStringRecordsOfOneDevice(
-            deviceId, times, measurementsList, valuesList, false);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertStringRecordsOfOneDevice(deviceId, times, 
measurementsList, valuesList, false);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1202,25 +1185,22 @@ public class SessionPool implements ISessionPool {
       List<List<Object>> valuesList,
       boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1243,25 +1223,22 @@ public class SessionPool implements ISessionPool {
       List<List<Object>> valuesList,
       boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1282,25 +1259,22 @@ public class SessionPool implements ISessionPool {
       List<List<String>> valuesList,
       boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertStringRecordsOfOneDevice(
-            deviceId, times, measurementsList, valuesList, haveSorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertStringRecordsOfOneDevice(
+          deviceId, times, measurementsList, valuesList, haveSorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1320,25 +1294,22 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, false);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, false);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1357,25 +1328,21 @@ public class SessionPool implements ISessionPool {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedStringRecordsOfOneDevice(
-            deviceId, times, measurementsList, valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in 
insertAlignedStringRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedStringRecordsOfOneDevice(deviceId, times, 
measurementsList, valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in 
insertAlignedStringRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1397,25 +1364,23 @@ public class SessionPool implements ISessionPool {
       List<List<Object>> valuesList,
       boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecordsOfOneDevice(
-            deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecordsOfOneDevice(
+          deviceId, times, measurementsList, typesList, valuesList, 
haveSorted);
+      putBack(session);
+      return;
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1436,25 +1401,22 @@ public class SessionPool implements ISessionPool {
       List<List<String>> valuesList,
       boolean haveSorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedStringRecordsOfOneDevice(
-            deviceId, times, measurementsList, valuesList, haveSorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in 
insertAlignedStringRecordsOfOneDevice", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedStringRecordsOfOneDevice(
+          deviceId, times, measurementsList, valuesList, haveSorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in 
insertAlignedStringRecordsOfOneDevice", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1472,24 +1434,21 @@ public class SessionPool implements ISessionPool {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecords(deviceIds, times, measurementsList, valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecords(deviceIds, times, measurementsList, valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1507,25 +1466,22 @@ public class SessionPool implements ISessionPool {
       List<List<String>> multiMeasurementComponentsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecords(
-            multiSeriesIds, times, multiMeasurementComponentsList, valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecords", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecords(
+          multiSeriesIds, times, multiMeasurementComponentsList, valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecords", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1544,24 +1500,21 @@ public class SessionPool implements ISessionPool {
       List<TSDataType> types,
       Object... values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecord(deviceId, time, measurements, types, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.error(INSERT_RECORD_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecord(deviceId, time, measurements, types, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.error(INSERT_RECORD_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1580,24 +1533,21 @@ public class SessionPool implements ISessionPool {
       List<TSDataType> types,
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecord(deviceId, time, measurements, types, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORD_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecord(deviceId, time, measurements, types, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORD_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1642,24 +1592,21 @@ public class SessionPool implements ISessionPool {
       List<TSDataType> types,
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecord(multiSeriesId, time, 
multiMeasurementComponents, types, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecord failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecord", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecord(multiSeriesId, time, 
multiMeasurementComponents, types, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecord failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecord", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1674,24 +1621,21 @@ public class SessionPool implements ISessionPool {
   public void insertRecord(
       String deviceId, long time, List<String> measurements, List<String> 
values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertRecord(deviceId, time, measurements, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(INSERT_RECORD_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertRecord(deviceId, time, measurements, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(INSERT_RECORD_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1706,52 +1650,46 @@ public class SessionPool implements ISessionPool {
   public void insertAlignedRecord(
       String multiSeriesId, long time, List<String> 
multiMeasurementComponents, List<String> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.insertAlignedRecord(multiSeriesId, time, 
multiMeasurementComponents, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("insertAlignedRecord failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in insertAlignedRecord", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  /**
-   * This method NOT insert data into database and the server just return 
after accept the request,
-   * this method should be used to test other time cost in client
-   */
-  @Override
-  public void testInsertTablet(Tablet tablet)
-      throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertTablet(tablet);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertTablet failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertTablet", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.insertAlignedRecord(multiSeriesId, time, 
multiMeasurementComponents, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("insertAlignedRecord failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in insertAlignedRecord", e);
+      putBack(session);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * This method NOT insert data into database and the server just return 
after accept the request,
+   * this method should be used to test other time cost in client
+   */
+  @Override
+  public void testInsertTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    ISession session = getSession();
+    try {
+      session.testInsertTablet(tablet);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertTablet failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertTablet", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1762,24 +1700,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void testInsertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertTablet(tablet, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertTablet failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertTablet", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertTablet(tablet, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertTablet failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertTablet", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1790,24 +1725,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void testInsertTablets(Map<String, Tablet> tablets)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertTablets(tablets);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertTablets failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertTablets", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertTablets(tablets);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertTablets failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertTablets", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1818,24 +1750,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertTablets(tablets, sorted);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertTablets failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertTablets", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertTablets(tablets, sorted);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertTablets failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertTablets", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1850,24 +1779,21 @@ public class SessionPool implements ISessionPool {
       List<List<String>> measurementsList,
       List<List<String>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertRecords(deviceIds, times, measurementsList, 
valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertRecords", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertRecords(deviceIds, times, measurementsList, 
valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertRecords", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1883,24 +1809,21 @@ public class SessionPool implements ISessionPool {
       List<List<TSDataType>> typesList,
       List<List<Object>> valuesList)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertRecords(deviceIds, times, measurementsList, 
typesList, valuesList);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertRecords failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertRecords", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertRecords(deviceIds, times, measurementsList, typesList, 
valuesList);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertRecords failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertRecords", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1912,24 +1835,21 @@ public class SessionPool implements ISessionPool {
   public void testInsertRecord(
       String deviceId, long time, List<String> measurements, List<String> 
values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertRecord(deviceId, time, measurements, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertRecord failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertRecord", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertRecord(deviceId, time, measurements, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertRecord failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertRecord", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1945,24 +1865,21 @@ public class SessionPool implements ISessionPool {
       List<TSDataType> types,
       List<Object> values)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.testInsertRecord(deviceId, time, measurements, types, values);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("testInsertRecord failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in testInsertRecord", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.testInsertRecord(deviceId, time, measurements, types, values);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("testInsertRecord failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in testInsertRecord", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -1974,24 +1891,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void deleteTimeseries(String path)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.deleteTimeseries(path);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("deleteTimeseries failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in deleteTimeseries", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.deleteTimeseries(path);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("deleteTimeseries failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in deleteTimeseries", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -2003,24 +1917,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void deleteTimeseries(List<String> paths)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.deleteTimeseries(paths);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("deleteTimeseries failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in deleteTimeseries", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.deleteTimeseries(paths);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("deleteTimeseries failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in deleteTimeseries", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -2033,24 +1944,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void deleteData(String path, long time)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.deleteData(path, time);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(DELETE_DATA_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(DELETE_DATA_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.deleteData(path, time);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(DELETE_DATA_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -2063,24 +1971,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void deleteData(List<String> paths, long time)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.deleteData(paths, time);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(DELETE_DATA_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(DELETE_DATA_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.deleteData(paths, time);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(DELETE_DATA_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -2094,24 +1999,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void deleteData(List<String> paths, long startTime, long endTime)
       throws IoTDBConnectionException, StatementExecutionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.deleteData(paths, startTime, endTime);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn(DELETE_DATA_FAIL, e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error(DELETE_DATA_ERROR_MSG, e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.deleteData(paths, startTime, endTime);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn(DELETE_DATA_FAIL, e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -3111,24 +3013,21 @@ public class SessionPool implements ISessionPool {
   @Override
   public void executeNonQueryStatement(String sql)
       throws StatementExecutionException, IoTDBConnectionException {
-    for (int i = 0; i < RETRY; i++) {
-      ISession session = getSession();
-      try {
-        session.executeNonQueryStatement(sql);
-        putBack(session);
-        return;
-      } catch (IoTDBConnectionException e) {
-        // TException means the connection is broken, remove it and get a new 
one.
-        LOGGER.warn("executeNonQueryStatement failed", e);
-        cleanSessionAndMayThrowConnectionException(session, i, e);
-      } catch (StatementExecutionException | RuntimeException e) {
-        putBack(session);
-        throw e;
-      } catch (Throwable e) {
-        LOGGER.error("unexpected error in executeNonQueryStatement", e);
-        putBack(session);
-        throw new RuntimeException(e);
-      }
+    ISession session = getSession();
+    try {
+      session.executeNonQueryStatement(sql);
+      putBack(session);
+    } catch (IoTDBConnectionException e) {
+      // TException means the connection is broken, remove it and get a new 
one.
+      LOGGER.warn("executeNonQueryStatement failed", e);
+      cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+    } catch (StatementExecutionException | RuntimeException e) {
+      putBack(session);
+      throw e;
+    } catch (Throwable e) {
+      LOGGER.error("unexpected error in executeNonQueryStatement", e);
+      putBack(session);
+      throw new RuntimeException(e);
     }
   }
 
@@ -3615,6 +3514,10 @@ public class SessionPool implements ISessionPool {
 
     private boolean enableAutoFetch;
 
+    private int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+    private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
     public Builder useSSL(boolean useSSL) {
       this.useSSL = useSSL;
       return this;
@@ -3710,6 +3613,16 @@ public class SessionPool implements ISessionPool {
       return this;
     }
 
+    public Builder maxRetryCount(int maxRetryCount) {
+      this.maxRetryCount = maxRetryCount;
+      return this;
+    }
+
+    public Builder retryIntervalInMs(long retryIntervalInMs) {
+      this.retryIntervalInMs = retryIntervalInMs;
+      return this;
+    }
+
     public SessionPool build() {
       return new SessionPool(this);
     }
diff --git 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
index 75f75bc9243..61a5abd3865 100644
--- 
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
+++ 
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session;
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RedirectException;
@@ -169,7 +170,9 @@ public class SessionConnectionTest {
         new SessionConnection(
             session,
             ZoneId.systemDefault(),
-            () -> Collections.singletonList(new TEndPoint("local", 12)));
+            () -> Collections.singletonList(new TEndPoint("local", 12)),
+            SessionConfig.MAX_RETRY_COUNT,
+            SessionConfig.RETRY_INTERVAL_IN_MS);
   }
 
   @Test(expected = IoTDBConnectionException.class)
@@ -187,7 +190,9 @@ public class SessionConnectionTest {
             session,
             new TEndPoint("localhost", 1234),
             ZoneId.systemDefault(),
-            () -> Collections.singletonList(new TEndPoint("local", 12)));
+            () -> Collections.singletonList(new TEndPoint("local", 12)),
+            SessionConfig.MAX_RETRY_COUNT,
+            SessionConfig.RETRY_INTERVAL_IN_MS);
   }
 
   @Test
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 2f808350717..6c8c2d15bf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -54,6 +54,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+
 /**
  * The coordinator for MPP. It manages all the queries which are executed in 
current Node. And it
  * will be responsible for the lifecycle of a query. A query request will be 
represented as a
@@ -135,7 +137,7 @@ public class Coordinator {
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     MPPQueryContext queryContext = null;
     try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
-      if (sql != null && sql.length() > 0) {
+      if (sql != null && !sql.isEmpty()) {
         LOGGER.debug("[QueryStart] sql: {}", sql);
       }
       queryContext =
@@ -160,7 +162,12 @@ public class Coordinator {
         queryContext.setTimeOut(Long.MAX_VALUE);
       }
       execution.start();
-      return execution.getStatus();
+      ExecutionResult result = execution.getStatus();
+      if (!execution.isQuery() && result.status != null && 
needRetry(result.status)) {
+        // if it's write request and the result status needs to retry
+        result.status.setNeedRetry(true);
+      }
+      return result;
     } finally {
       int lockNums = queryContext.getAcquiredLockNum();
       if (queryContext != null && lockNums > 0) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
index aeff020a0da..a98194fccc4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
-import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -104,6 +104,6 @@ public class AsyncSendPlanNodeHandler implements 
AsyncMethodCallback<TSendBatchP
   }
 
   private boolean needRetry(TSendSinglePlanNodeResp resp) {
-    return !resp.accepted && DispatchLogHandler.needRetry(resp.status.code);
+    return !resp.accepted && resp.status != null && 
StatusUtils.needRetryHelper(resp.status);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index b48e8201629..c542bd09c8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -284,8 +284,9 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     return this.localhostIpAddr.equals(endPoint.getIp()) && 
localhostInternalPort == endPoint.port;
   }
 
-  private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
-      throws FragmentInstanceDispatchException {
+  private void dispatchRemoteHelper(FragmentInstance instance, TEndPoint 
endPoint)
+      throws FragmentInstanceDispatchException, TException, 
ClientManagerException,
+          RatisReadUnavailableException {
     try (SyncDataNodeInternalServiceClient client =
         syncInternalServiceClientManager.borrowClient(endPoint)) {
       switch (instance.getType()) {
@@ -348,18 +349,35 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
                   TSStatusCode.EXECUTE_STATEMENT_ERROR,
                   String.format("unknown read type [%s]", 
instance.getType())));
       }
+    }
+  }
+
+  private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+      throws FragmentInstanceDispatchException {
+
+    try {
+      dispatchRemoteHelper(instance, endPoint);
     } catch (ClientManagerException | TException | 
RatisReadUnavailableException e) {
       logger.warn(
-          "can't execute request on node {}, error msg is {}.",
+          "can't execute request on node {}, error msg is {}, and we try to 
reconnect this node.",
           endPoint,
           ExceptionUtils.getRootCause(e).toString());
-      TSStatus status = new TSStatus();
-      status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
-      status.setMessage("can't connect to node " + endPoint);
-      // If the DataNode cannot be connected, its endPoint will be put into 
black list
-      // so that the following retry will avoid dispatching instance towards 
this DataNode.
-      queryContext.addFailedEndPoint(endPoint);
-      throw new FragmentInstanceDispatchException(status);
+      // we just retry once to clear stale connection for a restart node.
+      try {
+        dispatchRemoteHelper(instance, endPoint);
+      } catch (ClientManagerException | TException | 
RatisReadUnavailableException e1) {
+        logger.warn(
+            "can't execute request on node  {} in second try, error msg is 
{}.",
+            endPoint,
+            ExceptionUtils.getRootCause(e1).toString());
+        TSStatus status = new TSStatus();
+        status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+        status.setMessage("can't connect to node " + endPoint);
+        // If the DataNode cannot be connected, its endPoint will be put into 
black list
+        // so that the following retry will avoid dispatching instance towards 
this DataNode.
+        queryContext.addFailedEndPoint(endPoint);
+        throw new FragmentInstanceDispatchException(status);
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 9d17c3594ac..2bbd1bd0442 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+
 public class ErrorHandlingUtils {
 
   private ErrorHandlingUtils() {}
@@ -62,7 +64,17 @@ public class ErrorHandlingUtils {
     } else {
       LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
     }
-    return RpcUtils.getStatus(statusCode, message + e.getMessage());
+    if (e instanceof SemanticException) {
+      Throwable rootCause = getRootCause(e);
+      if (e.getCause() instanceof IoTDBException) {
+        return RpcUtils.getStatus(
+            ((IoTDBException) e.getCause()).getErrorCode(), 
rootCause.getMessage());
+      }
+      return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR, 
rootCause.getMessage());
+    }
+    TSStatus status = RpcUtils.getStatus(statusCode, message + e.getMessage());
+    status.setNeedRetry(needRetry(status));
+    return status;
   }
 
   public static TSStatus onNpeOrUnexpectedException(
@@ -91,6 +103,7 @@ public class ErrorHandlingUtils {
           LOGGER.warn(message, e);
         }
       }
+      status.setNeedRetry(needRetry(status));
       return status;
     } else {
       return onNpeOrUnexpectedException(e, operation, statusCode);
@@ -105,7 +118,7 @@ public class ErrorHandlingUtils {
     return onQueryException(e, operation.getName());
   }
 
-  public static TSStatus tryCatchQueryException(Exception e) {
+  private static TSStatus tryCatchQueryException(Exception e) {
     Throwable rootCause = getRootCause(e);
     // ignore logging sg not ready exception
     if (rootCause instanceof StorageGroupNotReadyException) {
@@ -146,16 +159,19 @@ public class ErrorHandlingUtils {
 
   public static TSStatus onNonQueryException(Exception e, String operation) {
     TSStatus status = tryCatchNonQueryException(e);
-    return status != null
-        ? status
-        : onNpeOrUnexpectedException(e, operation, 
TSStatusCode.INTERNAL_SERVER_ERROR);
+    if (status != null) {
+      status.setNeedRetry(needRetry(status));
+      return status;
+    } else {
+      return onNpeOrUnexpectedException(e, operation, 
TSStatusCode.INTERNAL_SERVER_ERROR);
+    }
   }
 
   public static TSStatus onNonQueryException(Exception e, OperationType 
operation) {
     return onNonQueryException(e, operation.getName());
   }
 
-  public static TSStatus tryCatchNonQueryException(Exception e) {
+  private static TSStatus tryCatchNonQueryException(Exception e) {
     String message = "Exception occurred while processing non-read. ";
     if (e instanceof BatchProcessException) {
       BatchProcessException batchException = (BatchProcessException) e;
@@ -184,7 +200,9 @@ public class ErrorHandlingUtils {
         String.format(
             "[%s] Exception occurred: %s failed. %s", statusCode, operation, 
e.getMessage());
     LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
-    return RpcUtils.getStatus(errorCode, message);
+    TSStatus status = RpcUtils.getStatus(errorCode, message);
+    status.setNeedRetry(needRetry(status));
+    return status;
   }
 
   public static TSStatus onIoTDBException(Exception e, OperationType 
operation, int errorCode) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 11fd755a8ca..c54a8325685 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -405,6 +405,10 @@ public class CommonConfig {
     return status == NodeStatus.ReadOnly;
   }
 
+  public boolean isRunning() {
+    return status == NodeStatus.Running;
+  }
+
   public NodeStatus getNodeStatus() {
     return status;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 83f041b0f6e..be32bf5a459 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -20,11 +20,15 @@
 package org.apache.iotdb.commons.utils;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 public class StatusUtils {
   private StatusUtils() {}
@@ -34,6 +38,29 @@ public class StatusUtils {
   public static final TSStatus EXECUTE_STATEMENT_ERROR =
       getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
 
+  private static final Set<Integer> NEED_RETRY = new HashSet<>();
+
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+
+  static {
+    NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.WAL_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_TIME_OUT.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode());
+    NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+  }
+
   /**
    * @param statusMap index -> status
    * @param size the total number of status to generate
@@ -171,4 +198,29 @@ public class StatusUtils {
     }
     return status;
   }
+
+  public static boolean needRetry(TSStatus status) {
+    // always retry while node is in not running case
+    if (!COMMON_CONFIG.isRunning()) {
+      return true;
+    } else if (status == null) {
+      return false;
+    }
+    return needRetryHelper(status);
+  }
+
+  public static boolean needRetryHelper(TSStatus status) {
+    int code = status.getCode();
+    if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+      for (TSStatus subStatus : status.subStatus) {
+        if (subStatus == null
+            || (subStatus.getCode() != OK.code && 
!NEED_RETRY.contains(subStatus.getCode()))) {
+          return false;
+        }
+      }
+      return true;
+    } else {
+      return NEED_RETRY.contains(code);
+    }
+  }
 }
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index dbdb8bd406f..b574b52de09 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -33,6 +33,7 @@ struct TSStatus {
   2: optional string message
   3: optional list<TSStatus> subStatus
   4: optional TEndPoint redirectNode
+  5: optional bool needRetry
 }
 
 enum TConsensusGroupType {

Reply via email to