This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch add_retry_in_session
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0c405e84998f55fbc4e5432ead9079ef3e6bd0f5
Author: qiaojialin <[email protected]>
AuthorDate: Tue May 12 14:42:03 2020 +0800

    add retry in session and jdbc.execute()
---
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  14 +-
 .../java/org/apache/iotdb/session/Session.java     | 162 +++++++++++++++++++--
 2 files changed, 157 insertions(+), 19 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index fdd81bc..0096cdb 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -174,11 +174,17 @@ public class IoTDBStatement implements Statement {
       return executeSQL(sql);
     } catch (TException e) {
       if (reConnect()) {
-        throw new SQLException(String.format("Fail to execute %s", sql), e);
+        try {
+          return executeSQL(sql);
+        } catch (TException e2) {
+          throw new SQLException(
+              "Fail to execute sql " + sql + "after reconnecting. please check 
server status",
+              e2);
+        }
       } else {
-        throw new SQLException(String
-                .format("Fail to reconnect to server when executing %s. please 
check server status",
-                        sql), e);
+        throw new SQLException(
+            "Fail to reconnect to server when execute sql " + sql
+                + ". please check server status", e);
       }
     }
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 07245e6..0ffb4b4 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -75,6 +75,10 @@ public class Session {
   private ZoneId zoneId;
   private long statementId;
   private int fetchSize;
+  private int connectionTimeoutInMs = Config.DEFAULT_TIMEOUT_MS;
+  private boolean enableRPCCompression = false;
+  private final int RETRY_NUM = 2;
+  private final int RETRY_INTERVAL = 1000;
 
   public Session(String host, int port) {
     this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -113,6 +117,8 @@ public class Session {
     if (!isClosed) {
       return;
     }
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
     transport = new TSocket(host, port, connectionTimeoutInMs);
     if (!transport.isOpen()) {
       try {
@@ -185,6 +191,26 @@ public class Session {
     }
   }
 
+  private boolean reconnect() {
+    boolean flag = false;
+    for (int i = 1; i <= RETRY_NUM; i++) {
+      try {
+        if (transport != null) {
+          transport.close();
+        }
+        isClosed = true;
+        open(enableRPCCompression, connectionTimeoutInMs);
+      } catch (Exception e) {
+        try {
+          Thread.sleep(RETRY_INTERVAL);
+        } catch (InterruptedException e1) {
+          logger.error("reconnect is interrupted.", e1);
+        }
+      }
+    }
+    return flag;
+  }
+
   /**
    * insert data in one row, if you want to improve your performance, please 
use insertRecords method
    * or insertTablet method
@@ -221,7 +247,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecord(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -276,7 +311,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablet(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablet(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -331,7 +375,16 @@ public class Session {
       try {
         RpcUtils.verifySuccess(client.insertTablets(request).statusList);
       } catch (TException e) {
-        throw new IoTDBConnectionException(e);
+        if (reconnect()) {
+          try {
+            RpcUtils.verifySuccess(client.insertTablets(request).statusList);
+          } catch (TException e1) {
+            throw new IoTDBConnectionException(e1);
+          }
+        } else {
+          throw new IoTDBConnectionException("Fail to reconnect to server,"
+              + " please check server status", e);
+        }
       }
     }
   }
@@ -365,7 +418,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecords(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -463,7 +525,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -496,7 +567,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteData(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteData(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -505,7 +585,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroupId));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroupId));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -522,7 +611,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroup));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroup));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -550,7 +648,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.createTimeseries(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.createTimeseries(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -590,7 +697,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          
RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -647,9 +763,17 @@ public class Session {
     try {
       execResp = client.executeQueryStatement(execReq);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          execResp = client.executeQueryStatement(execReq);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
-
     RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(), execResp.columnNameIndexMap,
         execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
@@ -664,10 +788,18 @@ public class Session {
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
     try {
-      TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
-      RpcUtils.verifySuccess(execResp.getStatus());
+      
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 

Reply via email to