This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 48f01b7  [IOTDB-926] Support reconnection of Session (#1821)
48f01b7 is described below

commit 48f01b7878bfaa4f11916f92b257e22f7e32c644
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Oct 21 03:43:04 2020 +0800

    [IOTDB-926] Support reconnection of Session (#1821)
---
 .../main/java/org/apache/iotdb/jdbc/Config.java    |   2 +-
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |   2 +-
 .../main/java/org/apache/iotdb/session/Config.java |   3 +-
 .../java/org/apache/iotdb/session/Session.java     | 131 ++++++++++++++++++---
 4 files changed, 119 insertions(+), 19 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 2cd4ff8..9025607 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -45,7 +45,7 @@ public class Config {
   static final String DEFALUT_PASSWORD = "password";
 
   static final int RETRY_NUM = 3;
-  static final long RETRY_INTERVAL = 1000;
+  static final long RETRY_INTERVAL_MS = 1000;
 
   static int fetchSize = 10000;
   static int connectionTimeoutInMs = 0;
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index e36fd80..b217bb9 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -481,7 +481,7 @@ public class IoTDBConnection implements Connection {
         }
       } catch (Exception e) {
         try {
-          Thread.sleep(Config.RETRY_INTERVAL);
+          Thread.sleep(Config.RETRY_INTERVAL_MS);
         } catch (InterruptedException e1) {
           logger.error("reconnect is interrupted.", e1);
           Thread.currentThread().interrupt();
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java 
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 325ae26..b7cfb24 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -24,5 +24,6 @@ public class Config {
   public static final String DEFAULT_PASSWORD = "password";
   public static final int DEFAULT_FETCH_SIZE = 10000;
   public static final int DEFAULT_TIMEOUT_MS = 0;
-
+  public static final int RETRY_NUM = 3;
+  public static final long RETRY_INTERVAL_MS = 1000;
 }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4fc2404..036caa0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -29,9 +29,6 @@ import org.apache.iotdb.rpc.BatchExecutionException;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
@@ -40,12 +37,16 @@ import 
org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -66,7 +67,6 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 
 public class Session {
 
@@ -83,6 +83,8 @@ public class Session {
   private ZoneId zoneId;
   private long statementId;
   private int fetchSize;
+  private boolean enableRPCCompression;
+  private int connectionTimeoutInMs;
 
   public Session(String host, int rpcPort) {
     this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -122,6 +124,9 @@ public class Session {
       return;
     }
 
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+
     transport = new TFastFramedTransport(new TSocket(host, rpcPort, 
connectionTimeoutInMs));
 
     if (!transport.isOpen()) {
@@ -239,7 +244,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablet(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablet(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -293,7 +306,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablets(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablets(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -345,7 +366,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecords(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -393,7 +422,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertStringRecords(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertStringRecords(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -430,7 +467,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -465,7 +510,15 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertStringRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertStringRecord(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
   }
 
@@ -909,7 +962,15 @@ public class Session {
     try {
       execResp = client.executeQueryStatement(execReq);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          execResp = client.executeQueryStatement(execReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
 
     RpcUtils.verifySuccess(execResp.getStatus());
@@ -927,12 +988,21 @@ public class Session {
   public void executeNonQueryStatement(String sql)
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
+    TSExecuteStatementResp execResp;
     try {
-      TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
-      RpcUtils.verifySuccess(execResp.getStatus());
+      execResp = client.executeUpdateStatement(execReq);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          execResp = client.executeUpdateStatement(execReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
+    RpcUtils.verifySuccess(execResp.getStatus());
   }
 
   /**
@@ -957,7 +1027,15 @@ public class Session {
     try {
       execResp = client.executeRawDataQuery(execReq);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          execResp = client.executeRawDataQuery(execReq);
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server. 
Please check server status");
+      }
     }
 
     RpcUtils.verifySuccess(execResp.getStatus());
@@ -1057,4 +1135,25 @@ public class Session {
     }
   }
 
+  private boolean reconnect() {
+    boolean flag = false;
+    for (int i = 1; i <= Config.RETRY_NUM; i++) {
+      try {
+        if (transport != null) {
+          close();
+          open(enableRPCCompression, connectionTimeoutInMs);
+          flag = true;
+        }
+      } catch (Exception e) {
+        try {
+          Thread.sleep(Config.RETRY_INTERVAL_MS);
+        } catch (InterruptedException e1) {
+          logger.error("reconnect is interrupted.", e1);
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+    return flag;
+  }
+
 }

Reply via email to