This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d27c93  [IOTDB-667] add retry in session and jdbc.execute (#1194)
4d27c93 is described below

commit 4d27c9364e752e63af4b269ba89c477399eb5468
Author: Jialin Qiao <[email protected]>
AuthorDate: Tue May 12 19:38:07 2020 +0800

    [IOTDB-667] add retry in session and jdbc.execute (#1194)
    
    * add retry in session and jdbc.execute()
---
 .../org/apache/iotdb/jdbc/IoTDBConnection.java     |   1 +
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java |  14 +-
 .../java/org/apache/iotdb/session/Session.java     | 165 +++++++++++++++++++--
 3 files changed, 161 insertions(+), 19 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 0f5982c..e634674 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -479,6 +479,7 @@ public class IoTDBConnection implements Connection {
         try {
           Thread.sleep(Config.RETRY_INTERVAL);
         } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
           logger.error("reconnect is interrupted.", e1);
         }
       }
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index fdd81bc..0096cdb 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -174,11 +174,17 @@ public class IoTDBStatement implements Statement {
       return executeSQL(sql);
     } catch (TException e) {
       if (reConnect()) {
-        throw new SQLException(String.format("Fail to execute %s", sql), e);
+        try {
+          return executeSQL(sql);
+        } catch (TException e2) {
+          throw new SQLException(
+              "Fail to execute sql " + sql + "after reconnecting. please check 
server status",
+              e2);
+        }
       } else {
-        throw new SQLException(String
-                .format("Fail to reconnect to server when executing %s. please 
check server status",
-                        sql), e);
+        throw new SQLException(
+            "Fail to reconnect to server when execute sql " + sql
+                + ". please check server status", e);
       }
     }
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 07245e6..12a5f56 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -75,6 +75,10 @@ public class Session {
   private ZoneId zoneId;
   private long statementId;
   private int fetchSize;
+  private int connectionTimeoutInMs = Config.DEFAULT_TIMEOUT_MS;
+  private boolean enableRPCCompression = false;
+  private final int RETRY_NUM = 2;
+  private final int RETRY_INTERVAL = 1000;
 
   public Session(String host, int port) {
     this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -113,6 +117,8 @@ public class Session {
     if (!isClosed) {
       return;
     }
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
     transport = new TSocket(host, port, connectionTimeoutInMs);
     if (!transport.isOpen()) {
       try {
@@ -185,6 +191,29 @@ public class Session {
     }
   }
 
+  private boolean reconnect() {
+    boolean flag = false;
+    for (int i = 1; i <= RETRY_NUM; i++) {
+      try {
+        if (transport != null) {
+          transport.close();
+        }
+        isClosed = true;
+        open(enableRPCCompression, connectionTimeoutInMs);
+        flag = true;
+        break;
+      } catch (Exception e) {
+        try {
+          Thread.sleep(RETRY_INTERVAL);
+        } catch (InterruptedException e1) {
+          Thread.currentThread().interrupt();
+          logger.error("reconnection is interrupted.", e1);
+        }
+      }
+    }
+    return flag;
+  }
+
   /**
    * insert data in one row, if you want to improve your performance, please 
use insertRecords method
    * or insertTablet method
@@ -221,7 +250,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecord(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecord(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -276,7 +314,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertTablet(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertTablet(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -331,7 +378,16 @@ public class Session {
       try {
         RpcUtils.verifySuccess(client.insertTablets(request).statusList);
       } catch (TException e) {
-        throw new IoTDBConnectionException(e);
+        if (reconnect()) {
+          try {
+            RpcUtils.verifySuccess(client.insertTablets(request).statusList);
+          } catch (TException e1) {
+            throw new IoTDBConnectionException(e1);
+          }
+        } else {
+          throw new IoTDBConnectionException("Fail to reconnect to server,"
+              + " please check server status", e);
+        }
       }
     }
   }
@@ -365,7 +421,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.insertRecords(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -463,7 +528,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -496,7 +570,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteData(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteData(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -505,7 +588,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroupId));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.setStorageGroup(sessionId, 
storageGroupId));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -522,7 +614,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroup));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, 
storageGroup));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -550,7 +651,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.createTimeseries(request));
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          RpcUtils.verifySuccess(client.createTimeseries(request));
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -590,7 +700,16 @@ public class Session {
     try {
       RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          
RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 
@@ -647,9 +766,17 @@ public class Session {
     try {
       execResp = client.executeQueryStatement(execReq);
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          execResp = client.executeQueryStatement(execReq);
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
-
     RpcUtils.verifySuccess(execResp.getStatus());
     return new SessionDataSet(sql, execResp.getColumns(), 
execResp.getDataTypeList(), execResp.columnNameIndexMap,
         execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
@@ -664,10 +791,18 @@ public class Session {
       throws IoTDBConnectionException, StatementExecutionException {
     TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, 
statementId);
     try {
-      TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
-      RpcUtils.verifySuccess(execResp.getStatus());
+      
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
     } catch (TException e) {
-      throw new IoTDBConnectionException(e);
+      if (reconnect()) {
+        try {
+          
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
+        } catch (TException e1) {
+          throw new IoTDBConnectionException(e1);
+        }
+      } else {
+        throw new IoTDBConnectionException("Fail to reconnect to server,"
+            + " please check server status", e);
+      }
     }
   }
 

Reply via email to