This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 48f01b7 [IOTDB-926] Support reconnection of Session (#1821)
48f01b7 is described below
commit 48f01b7878bfaa4f11916f92b257e22f7e32c644
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Oct 21 03:43:04 2020 +0800
[IOTDB-926] Support reconnection of Session (#1821)
---
.../main/java/org/apache/iotdb/jdbc/Config.java | 2 +-
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 2 +-
.../main/java/org/apache/iotdb/session/Config.java | 3 +-
.../java/org/apache/iotdb/session/Session.java | 131 ++++++++++++++++++---
4 files changed, 119 insertions(+), 19 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
index 2cd4ff8..9025607 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java
@@ -45,7 +45,7 @@ public class Config {
static final String DEFALUT_PASSWORD = "password";
static final int RETRY_NUM = 3;
- static final long RETRY_INTERVAL = 1000;
+ static final long RETRY_INTERVAL_MS = 1000;
static int fetchSize = 10000;
static int connectionTimeoutInMs = 0;
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index e36fd80..b217bb9 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -481,7 +481,7 @@ public class IoTDBConnection implements Connection {
}
} catch (Exception e) {
try {
- Thread.sleep(Config.RETRY_INTERVAL);
+ Thread.sleep(Config.RETRY_INTERVAL_MS);
} catch (InterruptedException e1) {
logger.error("reconnect is interrupted.", e1);
Thread.currentThread().interrupt();
diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java
b/session/src/main/java/org/apache/iotdb/session/Config.java
index 325ae26..b7cfb24 100644
--- a/session/src/main/java/org/apache/iotdb/session/Config.java
+++ b/session/src/main/java/org/apache/iotdb/session/Config.java
@@ -24,5 +24,6 @@ public class Config {
public static final String DEFAULT_PASSWORD = "password";
public static final int DEFAULT_FETCH_SIZE = 10000;
public static final int DEFAULT_TIMEOUT_MS = 0;
-
+ public static final int RETRY_NUM = 3;
+ public static final long RETRY_INTERVAL_MS = 1000;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 4fc2404..036caa0 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -29,9 +29,6 @@ import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
@@ -40,12 +37,16 @@ import
org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
-import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
+import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
@@ -66,7 +67,6 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
public class Session {
@@ -83,6 +83,8 @@ public class Session {
private ZoneId zoneId;
private long statementId;
private int fetchSize;
+ private boolean enableRPCCompression;
+ private int connectionTimeoutInMs;
public Session(String host, int rpcPort) {
this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD);
@@ -122,6 +124,9 @@ public class Session {
return;
}
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+
transport = new TFastFramedTransport(new TSocket(host, rpcPort,
connectionTimeoutInMs));
if (!transport.isOpen()) {
@@ -239,7 +244,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertTablet(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertTablet(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -293,7 +306,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertTablets(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertTablets(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -345,7 +366,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertRecords(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertRecords(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -393,7 +422,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertStringRecords(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertStringRecords(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -430,7 +467,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertRecord(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertRecord(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -465,7 +510,15 @@ public class Session {
try {
RpcUtils.verifySuccess(client.insertStringRecord(request));
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ RpcUtils.verifySuccess(client.insertStringRecord(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
}
@@ -909,7 +962,15 @@ public class Session {
try {
execResp = client.executeQueryStatement(execReq);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ execResp = client.executeQueryStatement(execReq);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
RpcUtils.verifySuccess(execResp.getStatus());
@@ -927,12 +988,21 @@ public class Session {
public void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql,
statementId);
+ TSExecuteStatementResp execResp;
try {
- TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq);
- RpcUtils.verifySuccess(execResp.getStatus());
+ execResp = client.executeUpdateStatement(execReq);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ execResp = client.executeUpdateStatement(execReq);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
+ RpcUtils.verifySuccess(execResp.getStatus());
}
/**
@@ -957,7 +1027,15 @@ public class Session {
try {
execResp = client.executeRawDataQuery(execReq);
} catch (TException e) {
- throw new IoTDBConnectionException(e);
+ if (reconnect()) {
+ try {
+ execResp = client.executeRawDataQuery(execReq);
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException("Fail to reconnect to server.
Please check server status");
+ }
}
RpcUtils.verifySuccess(execResp.getStatus());
@@ -1057,4 +1135,25 @@ public class Session {
}
}
+ private boolean reconnect() {
+ boolean flag = false;
+ for (int i = 1; i <= Config.RETRY_NUM; i++) {
+ try {
+ if (transport != null) {
+ close();
+ open(enableRPCCompression, connectionTimeoutInMs);
+ flag = true;
+ }
+ } catch (Exception e) {
+ try {
+ Thread.sleep(Config.RETRY_INTERVAL_MS);
+ } catch (InterruptedException e1) {
+ logger.error("reconnect is interrupted.", e1);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ return flag;
+ }
+
}