This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 650bbcc25f7 [RTO/RPO] Unify retry logic on SessionConnection (#14894)
650bbcc25f7 is described below

commit 650bbcc25f7e24bf0942a90b3806a3ff3af555af
Author: William Song <[email protected]>
AuthorDate: Fri Feb 21 15:37:56 2025 +0800

    [RTO/RPO] Unify retry logic on SessionConnection (#14894)
    
    * refactor replications in session connection
    
    * fix npe error
    
    * change version to v2
    
    * address review issues
---
 .../apache/iotdb/session/SessionConnection.java    | 1019 +++++++-------------
 .../apache/iotdb/session/util/CheckedSupplier.java |   32 -
 2 files changed, 323 insertions(+), 728 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 31abf69ec2e..9dbbbc343e2 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
-import org.apache.iotdb.session.util.CheckedSupplier;
 import org.apache.iotdb.session.util.SessionUtils;
 
 import org.apache.thrift.TException;
@@ -72,6 +71,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -287,12 +287,14 @@ public class SessionConnection {
 
   protected void setTimeZone(String zoneId)
       throws StatementExecutionException, IoTDBConnectionException {
-    doOperation(
-        () -> {
-          TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
-          RpcUtils.verifySuccess(client.setTimeZone(req));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, 
zoneId);
+                  return client.setTimeZone(req);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
     setTimeZoneOfSession(zoneId);
   }
 
@@ -309,50 +311,52 @@ public class SessionConnection {
 
   protected void setStorageGroup(String storageGroup)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroup));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(() -> client.setStorageGroup(sessionId, 
storageGroup)).getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void deleteStorageGroups(List<String> storageGroups)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroups));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(() -> client.deleteStorageGroups(sessionId, 
storageGroups)).getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createTimeseries(TSCreateTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createTimeseries(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createAlignedTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createMultiTimeseries(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createMultiTimeseries(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected boolean checkTimeseriesExists(String path, long timeout)
@@ -377,26 +381,22 @@ public class SessionConnection {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeout);
-    TSExecuteStatementResp execResp;
-    try {
-      execReq.setEnableRedirectQuery(enableRedirect);
-      execResp = client.executeQueryStatementV2(execReq);
+    execReq.setEnableRedirectQuery(enableRedirect);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              execReq.setSessionId(sessionId);
+              execReq.setStatementId(statementId);
+              return client.executeQueryStatementV2(execReq);
+            });
+    TSExecuteStatementResp execResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execReq.setSessionId(sessionId);
-          execReq.setStatementId(statementId);
-          execResp = client.executeQueryStatementV2(execReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(execResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(
         sql,
         execResp.getColumns(),
@@ -419,49 +419,8 @@ public class SessionConnection {
 
   protected void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
-
     TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql, 
statementId);
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = executeNonQueryStatementInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerify(() -> executeNonQueryStatementInternal(request));
   }
 
   private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq 
request)
@@ -491,26 +450,23 @@ public class SessionConnection {
         new TSRawDataQueryReq(sessionId, paths, startTime, endTime, 
statementId);
     execReq.setFetchSize(session.fetchSize);
     execReq.setTimeout(timeOut);
-    TSExecuteStatementResp execResp;
-    try {
-      execReq.setEnableRedirectQuery(enableRedirect);
-      execResp = client.executeRawDataQueryV2(execReq);
+    execReq.setEnableRedirectQuery(enableRedirect);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              execReq.setSessionId(sessionId);
+              execReq.setStatementId(statementId);
+              return client.executeRawDataQueryV2(execReq);
+            });
+
+    TSExecuteStatementResp execResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          execReq.setSessionId(sessionId);
-          execReq.setStatementId(statementId);
-          execResp = client.executeRawDataQueryV2(execReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(execResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(
         "",
         execResp.getColumns(),
@@ -538,28 +494,27 @@ public class SessionConnection {
     req.setEnableRedirectQuery(enableRedirect);
     req.setLegalPathNodes(isLegalPathNodes);
     req.setTimeout(timeOut);
-    TSExecuteStatementResp tsExecuteStatementResp = null;
     TEndPoint redirectedEndPoint = null;
-    try {
-      tsExecuteStatementResp = 
client.executeFastLastDataQueryForOneDeviceV2(req);
-      
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (RedirectException e) {
-      redirectedEndPoint = e.getEndPoint();
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          req.setSessionId(sessionId);
-          req.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeFastLastDataQueryForOneDeviceV2(req);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              req.setSessionId(sessionId);
+              req.setStatementId(statementId);
+              return client.executeFastLastDataQueryForOneDeviceV2(req);
+            });
+
+    TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
+      try {
+        
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+      } catch (RedirectException e) {
+        redirectedEndPoint = e.getEndPoint();
       }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new Pair<>(
         new SessionDataSet(
             "",
@@ -587,25 +542,22 @@ public class SessionConnection {
     tsLastDataQueryReq.setFetchSize(session.fetchSize);
     tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
     tsLastDataQueryReq.setTimeout(timeOut);
-    TSExecuteStatementResp tsExecuteStatementResp;
-    try {
-      tsExecuteStatementResp = 
client.executeLastDataQueryV2(tsLastDataQueryReq);
+
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              tsLastDataQueryReq.setSessionId(sessionId);
+              tsLastDataQueryReq.setStatementId(statementId);
+              return client.executeLastDataQueryV2(tsLastDataQueryReq);
+            });
+    final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+    if (result.getRetryAttempts() == 0) {
       
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          tsLastDataQueryReq.setSessionId(sessionId);
-          tsLastDataQueryReq.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeLastDataQueryV2(tsLastDataQueryReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new SessionDataSet(
         "",
         tsExecuteStatementResp.getColumns(),
@@ -672,25 +624,21 @@ public class SessionConnection {
 
   private SessionDataSet executeAggregationQuery(TSAggregationQueryReq 
tsAggregationQueryReq)
       throws StatementExecutionException, IoTDBConnectionException, 
RedirectException {
-    TSExecuteStatementResp tsExecuteStatementResp;
-    try {
-      tsExecuteStatementResp = 
client.executeAggregationQueryV2(tsAggregationQueryReq);
+    RetryResult<TSExecuteStatementResp> result =
+        callWithReconnect(
+            () -> {
+              tsAggregationQueryReq.setSessionId(sessionId);
+              tsAggregationQueryReq.setStatementId(statementId);
+              return client.executeAggregationQueryV2(tsAggregationQueryReq);
+            });
+
+    TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+    if (result.getRetryAttempts() == 0) {
       
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          tsAggregationQueryReq.setSessionId(sessionId);
-          tsAggregationQueryReq.setStatementId(statementId);
-          tsExecuteStatementResp = 
client.executeAggregationQuery(tsAggregationQueryReq);
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
+    } else {
+      RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     }
 
-    RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
     return new SessionDataSet(
         "",
         tsExecuteStatementResp.getColumns(),
@@ -720,52 +668,7 @@ public class SessionConnection {
 
   protected void insertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
   }
 
   private TSStatus insertRecordInternal(TSInsertRecordReq request) throws 
TException {
@@ -775,52 +678,7 @@ public class SessionConnection {
 
   protected void insertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
   }
 
   private TSStatus insertRecordInternal(TSInsertStringRecordReq request) 
throws TException {
@@ -830,52 +688,8 @@ public class SessionConnection {
 
   protected void insertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertRecordsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws 
TException {
@@ -885,53 +699,8 @@ public class SessionConnection {
 
   protected void insertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertRecordsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request) 
throws TException {
@@ -941,53 +710,7 @@ public class SessionConnection {
 
   protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq 
request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertRecordsOfOneDeviceInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> 
insertRecordsOfOneDeviceInternal(request));
   }
 
   private TSStatus 
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
@@ -998,53 +721,7 @@ public class SessionConnection {
 
   protected void 
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertStringRecordsOfOneDeviceInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirection(() -> 
insertStringRecordsOfOneDeviceInternal(request));
   }
 
   private TSStatus insertStringRecordsOfOneDeviceInternal(
@@ -1053,57 +730,38 @@ public class SessionConnection {
     return client.insertStringRecordsOfOneDevice(request);
   }
 
-  protected void withRetry(TFunction<TSStatus> function)
+  private void callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+      TFunction<TSStatus> function, Supplier<List<String>> pathSupplier)
       throws StatementExecutionException, RedirectException, 
IoTDBConnectionException {
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          logger.warn(
-              "Thread {} was interrupted during retry {} with wait time {} ms. 
Exiting retry loop.",
-              Thread.currentThread().getName(),
-              i,
-              retryIntervalInMs);
-          break;
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = function.run();
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirection(status);
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
+    RetryResult<TSStatus> result = callWithRetry(function);
+
+    TSStatus status = result.getResult();
+    if (status != null) {
+      if (result.getRetryAttempts() == 0) {
+        RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
pathSupplier.get());
+      } else {
+        RpcUtils.verifySuccess(status);
       }
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
+    } else {
+      throw new IoTDBConnectionException(logForReconnectionFailure());
     }
+  }
+
+  private void callWithRetryAndVerifyWithRedirection(TFunction<TSStatus> 
function)
+      throws StatementExecutionException, RedirectException, 
IoTDBConnectionException {
+    RetryResult<TSStatus> result = callWithRetry(function);
 
+    TSStatus status = result.getResult();
     if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
+      if (result.getRetryAttempts() == 0) {
+        RpcUtils.verifySuccessWithRedirection(status);
+      } else {
+        RpcUtils.verifySuccess(status);
+      }
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
     } else {
       throw new IoTDBConnectionException(logForReconnectionFailure());
     }
@@ -1111,7 +769,7 @@ public class SessionConnection {
 
   protected void insertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-    withRetry(() -> insertTabletInternal(request));
+    callWithRetryAndVerifyWithRedirection(() -> insertTabletInternal(request));
   }
 
   private TSStatus insertTabletInternal(TSInsertTabletReq request) throws 
TException {
@@ -1121,53 +779,8 @@ public class SessionConnection {
 
   protected void insertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = insertTabletsInternal(request);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        if (i == 0) {
-          // first time succeed, take account for redirection info
-          RpcUtils.verifySuccessWithRedirectionForMultiDevices(status, 
request.getPrefixPaths());
-        } else {
-          // if it's retry, just ignore redirection info
-          RpcUtils.verifySuccess(status);
-        }
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
-
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+        () -> insertTabletsInternal(request), request::getPrefixPaths);
   }
 
   private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws 
TException {
@@ -1177,55 +790,31 @@ public class SessionConnection {
 
   protected void deleteTimeseries(List<String> paths)
       throws IoTDBConnectionException, StatementExecutionException {
+    callWithRetryAndVerify(() -> client.deleteTimeseries(sessionId, paths));
+  }
 
-    TException lastTException = null;
-    TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
-      if (i > 0) {
-        // re-init the TException and TSStatus
-        lastTException = null;
-        status = null;
-        // not first time, we need to sleep and then reconnect
-        try {
-          TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
-        } catch (InterruptedException e) {
-          // just ignore
-        }
-        if (!reconnect()) {
-          // reconnect failed, just continue to make another retry.
-          continue;
-        }
-      }
-      try {
-        status = client.deleteTimeseries(sessionId, paths);
-        // need retry
-        if (status.isSetNeedRetry() && status.isNeedRetry()) {
-          continue;
-        }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
-      } catch (TException e) {
-        // all network exception need retry until reaching maxRetryCount
-        lastTException = e;
-      }
-    }
+  public void deleteData(TSDeleteDataReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
+    callWithRetryAndVerify(() -> deleteDataInternal(request));
+  }
 
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
+  private void callWithRetryAndVerify(TFunction<TSStatus> rpc)
+      throws IoTDBConnectionException, StatementExecutionException {
+    RetryResult<TSStatus> result = callWithRetry(rpc);
+    if (result.getResult() != null) {
+      RpcUtils.verifySuccess(result.getResult());
+    } else if (result.getException() != null) {
+      throw new IoTDBConnectionException(result.getException());
     } else {
       throw new IoTDBConnectionException(logForReconnectionFailure());
     }
   }
 
-  public void deleteData(TSDeleteDataReq request)
-      throws IoTDBConnectionException, StatementExecutionException {
-
+  private RetryResult<TSStatus> callWithRetry(TFunction<TSStatus> rpc) {
     TException lastTException = null;
     TSStatus status = null;
-    for (int i = 0; i <= maxRetryCount; i++) {
+    int i;
+    for (i = 0; i <= maxRetryCount; i++) {
       if (i > 0) {
         // re-init the TException and TSStatus
         lastTException = null;
@@ -1234,7 +823,13 @@ public class SessionConnection {
         try {
           TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
         } catch (InterruptedException e) {
-          // just ignore
+          Thread.currentThread().interrupt();
+          logger.warn(
+              "Thread {} was interrupted during retry {} with wait time {} ms. 
Exiting retry loop.",
+              Thread.currentThread().getName(),
+              i,
+              retryIntervalInMs);
+          break;
         }
         if (!reconnect()) {
           // reconnect failed, just continue to make another retry.
@@ -1242,27 +837,19 @@ public class SessionConnection {
         }
       }
       try {
-        status = deleteDataInternal(request);
+        status = rpc.run();
         // need retry
         if (status.isSetNeedRetry() && status.isNeedRetry()) {
           continue;
         }
-        // succeed or don't need to retry
-        RpcUtils.verifySuccess(status);
-        return;
+        break;
       } catch (TException e) {
         // all network exception need retry until reaching maxRetryCount
         lastTException = e;
       }
     }
 
-    if (status != null) {
-      RpcUtils.verifySuccess(status);
-    } else if (lastTException != null) {
-      throw new IoTDBConnectionException(lastTException);
-    } else {
-      throw new IoTDBConnectionException(logForReconnectionFailure());
-    }
+    return new RetryResult<>(status, lastTException, i);
   }
 
   private TSStatus deleteDataInternal(TSDeleteDataReq request) throws 
TException {
@@ -1272,62 +859,74 @@ public class SessionConnection {
 
   protected void testInsertRecord(TSInsertStringRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertStringRecord(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertStringRecord(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertRecord(TSInsertRecordReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertRecord(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertRecord(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   public void testInsertRecords(TSInsertStringRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertStringRecords(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertStringRecords(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   public void testInsertRecords(TSInsertRecordsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertRecords(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertRecords(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertTablet(TSInsertTabletReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertTablet(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertTablet(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void testInsertTablets(TSInsertTabletsReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.testInsertTablets(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.testInsertTablets(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   @SuppressWarnings({
@@ -1380,105 +979,121 @@ public class SessionConnection {
 
   protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.createSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.appendSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.pruneSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
       throws StatementExecutionException, IoTDBConnectionException {
-    return doOperation(
-        () -> {
-          req.setSessionId(sessionId);
-          TSQueryTemplateResp execResp = client.querySchemaTemplate(req);
-          RpcUtils.verifySuccess(execResp.getStatus());
-          return execResp;
-        });
+    final TSQueryTemplateResp execResp =
+        callWithReconnect(
+                () -> {
+                  req.setSessionId(sessionId);
+                  return client.querySchemaTemplate(req);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(execResp.getStatus());
+    return execResp;
   }
 
   protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.setSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.setSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.unsetSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.dropSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected void createTimeseriesUsingSchemaTemplate(
       TCreateTimeseriesUsingSchemaTemplateReq request)
       throws IoTDBConnectionException, StatementExecutionException {
-    doOperation(
-        () -> {
-          request.setSessionId(sessionId);
-          
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
-          return null;
-        });
+    final TSStatus status =
+        callWithReconnect(
+                () -> {
+                  request.setSessionId(sessionId);
+                  return client.createTimeseriesUsingSchemaTemplate(request);
+                })
+            .getResult();
+    RpcUtils.verifySuccess(status);
   }
 
   protected TSBackupConfigurationResp getBackupConfiguration()
       throws IoTDBConnectionException, StatementExecutionException {
-    return doOperation(
-        () -> {
-          TSBackupConfigurationResp execResp = client.getBackupConfiguration();
-          RpcUtils.verifySuccess(execResp.getStatus());
-          return execResp;
-        });
+    final TSBackupConfigurationResp execResp =
+        callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+    RpcUtils.verifySuccess(execResp.getStatus());
+    return execResp;
   }
 
-  private <RETURN> RETURN doOperation(CheckedSupplier<RETURN, TException> 
supplier)
-      throws IoTDBConnectionException, StatementExecutionException {
-    RETURN ret;
+  private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier)
+      throws IoTDBConnectionException {
+    T ret;
     try {
-      ret = supplier.get();
+      ret = supplier.run();
+      return new RetryResult<>(ret, null, 0);
     } catch (TException e) {
       if (reconnect()) {
         try {
-          ret = supplier.get();
+          ret = supplier.run();
+          return new RetryResult<>(ret, null, 1);
         } catch (TException tException) {
           throw new IoTDBConnectionException(tException);
         }
@@ -1486,23 +1101,10 @@ public class SessionConnection {
         throw new IoTDBConnectionException(logForReconnectionFailure());
       }
     }
-    return ret;
   }
 
   public TSConnectionInfoResp fetchAllConnections() throws 
IoTDBConnectionException {
-    try {
-      return client.fetchAllConnectionsInfo();
-    } catch (TException e) {
-      if (reconnect()) {
-        try {
-          return client.fetchAllConnectionsInfo();
-        } catch (TException tException) {
-          throw new IoTDBConnectionException(tException);
-        }
-      } else {
-        throw new IoTDBConnectionException(logForReconnectionFailure());
-      }
-    }
+    return callWithReconnect(() -> 
client.fetchAllConnectionsInfo()).getResult();
   }
 
   public boolean isEnableRedirect() {
@@ -1544,4 +1146,29 @@ public class SessionConnection {
   private interface TFunction<T> {
     T run() throws TException;
   }
+
+  private static class RetryResult<T> {
+    private final T result;
+    private final TException exception;
+    private final int retryAttempts;
+
+    public RetryResult(T result, TException exception, int retryAttempts) {
+      Preconditions.checkArgument(result == null || exception == null);
+      this.result = result;
+      this.exception = exception;
+      this.retryAttempts = retryAttempts;
+    }
+
+    public int getRetryAttempts() {
+      return retryAttempts;
+    }
+
+    public TException getException() {
+      return exception;
+    }
+
+    public T getResult() {
+      return result;
+    }
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
deleted file mode 100644
index 93949524afb..00000000000
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.session.util;
-
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-/** Supplier with a throws-clause. */
-@FunctionalInterface
-public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
-  /**
-   * The same as {@link java.util.function.Supplier#get()} except that this 
method is declared with
-   * a throws-clause.
-   */
-  OUTPUT get() throws THROWABLE, StatementExecutionException;
-}

Reply via email to