This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4d27c93 [IOTDB-667] add retry in session and jdbc.execute (#1194)
4d27c93 is described below
commit 4d27c9364e752e63af4b269ba89c477399eb5468
Author: Jialin Qiao <[email protected]>
AuthorDate: Tue May 12 19:38:07 2020 +0800
[IOTDB-667] add retry in session and jdbc.execute (#1194)
* add retry in session and jdbc.execute()
---
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 1 +
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 14 +-
.../java/org/apache/iotdb/session/Session.java | 165 +++++++++++++++++++--
3 files changed, 161 insertions(+), 19 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 0f5982c..e634674 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -479,6 +479,7 @@ public class IoTDBConnection implements Connection {
try {
Thread.sleep(Config.RETRY_INTERVAL);
} catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
logger.error("reconnect is interrupted.", e1);
}
}
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index fdd81bc..0096cdb 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -174,11 +174,17 @@ public class IoTDBStatement implements Statement {
return executeSQL(sql);
} catch (TException e) {
if (reConnect()) {
- throw new SQLException(String.format("Fail to execute %s", sql), e);
+ try {
+ return executeSQL(sql);
+ } catch (TException e2) {
+ throw new SQLException(
+ "Fail to execute sql " + sql + "after reconnecting. please check
server status",
+ e2);
+ }
} else {
- throw new SQLException(String
- .format("Fail to reconnect to server when executing %s. please
check server status",
- sql), e);
+ throw new SQLException(
+ "Fail to reconnect to server when execute sql " + sql
+ + ". please check server status", e);
}
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 07245e6..12a5f56 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -75,6 +75,10 @@ public class Session {
private ZoneId zoneId;
private long statementId;
private int fetchSize;
+ private int connectionTimeoutInMs = Config.DEFAULT_TIMEOUT_MS;
+ private boolean enableRPCCompression = false;
+ private final int RETRY_NUM = 2;
+ private final int RETRY_INTERVAL = 1000;
public Session(String host, int port) {
this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -113,6 +117,8 @@ public class Session {
if (!isClosed) {
return;
}
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
transport = new TSocket(host, port, connectionTimeoutInMs);
if (!transport.isOpen()) {
try {
@@ -185,6 +191,29 @@ public class Session {
}
}
+ private boolean reconnect() {
+ boolean flag = false;
+ for (int i = 1; i <= RETRY_NUM; i++) {
+ try {
+ if (transport != null) {
+ transport.close();
+ }
+ isClosed = true;
+ open(enableRPCCompression, connectionTimeoutInMs);
+ flag = true;
+ break;
+ } catch (Exception e) {
+ try {
+ Thread.sleep(RETRY_INTERVAL);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ logger.error("reconnection is interrupted.", e1);
+ }
+ }
+ }
+ return flag;
+ }
+
/**
* insert data in one row, if you want to improve your performance, please
use insertRecords method
* or insertTablet method
@@ -221,7 +250,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertRecord(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertRecord(request));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -276,7 +314,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertTablet(request).statusList);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertTablet(request).statusList);
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -331,7 +378,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertTablets(request).statusList);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertTablets(request).statusList);
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
}
@@ -365,7 +421,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertRecords(request).statusList);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertRecords(request).statusList);
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -463,7 +528,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -496,7 +570,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.deleteData(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.deleteData(request));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -505,7 +588,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.setStorageGroup(sessionId,
storageGroupId));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.setStorageGroup(sessionId,
storageGroupId));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -522,7 +614,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroup));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroup));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -550,7 +651,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.createTimeseries(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.createTimeseries(request));
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -590,7 +700,16 @@ public class Session {
try {
RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+
RpcUtils.verifySuccess(client.createMultiTimeseries(request).statusList);
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}
@@ -647,9 +766,17 @@ public class Session {
try {
execResp = client.executeQueryStatement(execReq);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ execResp = client.executeQueryStatement(execReq);
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
-
RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet(sql, execResp.getColumns(),
execResp.getDataTypeList(), execResp.columnNameIndexMap,
execResp.getQueryId(), client, sessionId, execResp.queryDataSet);
@@ -664,10 +791,18 @@ public class Session {
throws IoTDBConnectionException, StatementExecutionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql,
statementId);
try {
- TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
- RpcUtils.verifySuccess(execResp.getStatus());
+
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+
RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).getStatus());
+ } catch (TException e1) {
+ throw new IoTDBConnectionException(e1);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server,"
+ + " please check server status", e);
+ }
}
}