This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch add_retry_in_session in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 0c405e84998f55fbc4e5432ead9079ef3e6bd0f5 Author: qiaojialin <[email protected]> AuthorDate: Tue May 12 14:42:03 2020 +0800 add retry in session and jdbc.execute() --- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 14 +- .../java/org/apache/iotdb/session/Session.java | 162 +++++++++++++++++++-- 2 files changed, 157 insertions(+), 19 deletions(-) diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index fdd81bc..0096cdb 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -174,11 +174,17 @@ public class IoTDBStatement implements Statement { return executeSQL(sql); } catch (TException e) { if (reConnect()) { - throw new SQLException(String.format("Fail to execute %s", sql), e); + try { + return executeSQL(sql); + } catch (TException e2) { + throw new SQLException( + "Fail to execute sql " + sql + "after reconnecting. please check server status", + e2); + } } else { - throw new SQLException(String - .format("Fail to reconnect to server when executing %s. please check server status", - sql), e); + throw new SQLException( + "Fail to reconnect to server when execute sql " + sql + + ". please check server status", e); } } } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 07245e6..0ffb4b4 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -75,6 +75,10 @@ public class Session { private ZoneId zoneId; private long statementId; private int fetchSize; + private int connectionTimeoutInMs = Config.DEFAULT_TIMEOUT_MS; + private boolean enableRPCCompression = false; + private final int RETRY_NUM = 2; + private final int RETRY_INTERVAL = 1000; public Session(String host, int port) { this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD); @@ -113,6 +117,8 @@ public class Session { if (!isClosed) { return; } + this.enableRPCCompression = enableRPCCompression; + this.connectionTimeoutInMs = connectionTimeoutInMs; transport = new TSocket(host, port, connectionTimeoutInMs); if (!transport.isOpen()) { try { @@ -185,6 +191,26 @@ public class Session { } } + private boolean reconnect() { + boolean flag = false; + for (int i = 1; i <= RETRY_NUM; i++) { + try { + if (transport != null) { + transport.close(); + } + isClosed = true; + open(enableRPCCompression, connectionTimeoutInMs); + } catch (Exception e) { + try { + Thread.sleep(RETRY_INTERVAL); + } catch (InterruptedException e1) { + logger.error("reconnect is interrupted.", e1); + } + } + } + return flag; + } + /** * insert data in one row, if you want to improve your performance, please use insertRecords method * or insertTablet method @@ -221,7 +247,16 @@ public class Session { try { RpcUtils.verifySuccess(client.insertRecord(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertRecord(request)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -276,7 +311,16 @@ public class Session { try { RpcUtils.verifySuccess(client.insertTablet(request).statusList); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertTablet(request).statusList); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -331,7 +375,16 @@ public class Session { try { RpcUtils.verifySuccess(client.insertTablets(request).statusList); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertTablets(request).statusList); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } } @@ -365,7 +418,16 @@ public class Session { try { RpcUtils.verifySuccess(client.insertRecords(request).statusList); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.insertRecords(request).statusList); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -463,7 +525,16 @@ public class Session { try { RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -496,7 +567,16 @@ public class Session { try { RpcUtils.verifySuccess(client.deleteData(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.deleteData(request)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -505,7 +585,16 @@ public class Session { try { RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroupId)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroupId)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -522,7 +611,16 @@ public class Session { try { RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, storageGroup)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, storageGroup)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -550,7 +648,16 @@ public class Session { try { RpcUtils.verifySuccess(client.createTimeseries(request)); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.createTimeseries(request)); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -590,7 +697,16 @@ public class Session { try { RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } } @@ -647,9 +763,17 @@ public class Session { try { execResp = client.executeQueryStatement(execReq); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + execResp = client.executeQueryStatement(execReq); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } - RpcUtils.verifySuccess(execResp.getStatus()); return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), execResp.columnNameIndexMap, execResp.getQueryId(), client, sessionId, execResp.queryDataSet); @@ -664,10 +788,18 @@ public class Session { throws IoTDBConnectionException, StatementExecutionException { TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId); try { - TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq); - RpcUtils.verifySuccess(execResp.getStatus()); + RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus()); } catch (TException e) { - throw new IoTDBConnectionException(e); + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus()); + } catch (TException e1) { + throw new IoTDBConnectionException(e1); + } + } else { + throw new IoTDBConnectionException("Fail to reconnect to server," + + " please check server status", e); + } } }
