This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch SessionRetry
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 11b9ca862f4ffa1d99591c200e4295fb787ec2e2
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Oct 12 18:22:46 2020 +0800

    add session retry
---
 .../main/java/org/apache/iotdb/session/Config.java |  3 +-
 .../java/org/apache/iotdb/session/Session.java     | 96 +++++++++++++++++++---
 2 files changed, 87 insertions(+), 12 deletions(-)

diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 325ae26..067b895 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -24,5 +24,6 @@ public class Config {
   public static final String DEFAULT_PASSWORD = "password";
   public static final int DEFAULT_FETCH_SIZE = 10000;
   public static final int DEFAULT_TIMEOUT_MS = 0;
-
+  public static final int RETRY_NUM = 3;
+  public static final long RETRY_INTERVAL = 1000;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4fc2404..17c38da 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -29,9 +29,6 @@ import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
@@ -40,12 +37,16 @@ import 
org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -66,7 +67,6 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 
 public class Session {
 
@@ -83,6 +83,8 @@ public class Session {
   private ZoneId zoneId;
   private long statementId;
   private int fetchSize;
+  private boolean enableRPCCompression;
+  private int connectionTimeoutInMs;
 
   public Session(String host, int rpcPort) {
     this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -122,6 +124,9 @@ public class Session {
       return;
     }
 
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+
     transport = new TFastFramedTransport(new TSocket(host, rpcPort, 
connectionTimeoutInMs));
 
     if (!transport.isOpen()) {
@@ -239,7 +244,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablet(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablet(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -293,7 +306,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablets(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablets(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -345,7 +366,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecords(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -393,7 +422,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertStringRecords(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertStringRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -430,7 +467,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -465,7 +510,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertStringRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertStringRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -1057,4 +1110,25 @@ public class Session {
     }
   }
 
+  private boolean reconnect() {
+    boolean flag = false;
+    for (int i = 1; i <= Config.RETRY_NUM; i++) {
+      try {
+        if (transport != null) {
+          close();
+          open(enableRPCCompression, connectionTimeoutInMs);
+          flag = true;
+        }
+      } catch (Exception e) {
+        try {
+          Thread.sleep(Config.RETRY_INTERVAL);
+        } catch (InterruptedException e1) {
+          logger.error("reconnect is interrupted.", e1);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    return flag;
+  }
+
 }

Reply via email to