This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98310822b32 Session insert request won't fail while rolling upgrade
98310822b32 is described below
commit 98310822b32ac95bc693f014715d46c6687ee3f7
Author: Jackie Tien <[email protected]>
AuthorDate: Sun Jan 28 10:26:04 2024 +0800
Session insert request won't fail while rolling upgrade
---
.../org/apache/iotdb/isession/SessionConfig.java | 4 +
.../java/org/apache/iotdb/session/Session.java | 27 +-
.../apache/iotdb/session/SessionConnection.java | 660 ++++++++--
.../org/apache/iotdb/session/pool/SessionPool.java | 1289 +++++++++-----------
.../iotdb/session/SessionConnectionTest.java | 9 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 11 +-
.../plan/scheduler/AsyncSendPlanNodeHandler.java | 4 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 38 +-
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 32 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 4 +
.../apache/iotdb/commons/utils/StatusUtils.java | 52 +
.../thrift-commons/src/main/thrift/common.thrift | 1 +
12 files changed, 1289 insertions(+), 842 deletions(-)
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
index 8004201ebeb..ac14a99c80f 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java
@@ -52,5 +52,9 @@ public class SessionConfig {
public static final boolean DEFAULT_ENABLE_AUTO_FETCH = true;
+ public static final int MAX_RETRY_COUNT = 60;
+
+ public static final long RETRY_INTERVAL_IN_MS = 500;
+
private SessionConfig() {}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index b05514cf821..4d0ad33efd7 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -162,6 +162,10 @@ public class Session implements ISession {
// default enable
protected boolean enableAutoFetch = true;
+ protected int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+ protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
private static final String REDIRECT_TWICE = "redirect twice";
private static final String REDIRECT_TWICE_RETRY = "redirect twice, please
try again.";
@@ -421,6 +425,8 @@ public class Session implements ISession {
this.trustStore = builder.trustStore;
this.trustStorePwd = builder.trustStorePwd;
this.enableAutoFetch = builder.enableAutoFetch;
+ this.maxRetryCount = builder.maxRetryCount;
+ this.retryIntervalInMs = builder.retryIntervalInMs;
}
@Override
@@ -580,9 +586,11 @@ public class Session implements ISession {
public SessionConnection constructSessionConnection(
Session session, TEndPoint endpoint, ZoneId zoneId) throws
IoTDBConnectionException {
if (endpoint == null) {
- return new SessionConnection(session, zoneId, availableNodes);
+ return new SessionConnection(
+ session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
}
- return new SessionConnection(session, endpoint, zoneId, availableNodes);
+ return new SessionConnection(
+ session, endpoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs);
}
@Override
@@ -1261,7 +1269,6 @@ public class Session implements ISession {
});
if (connection == null) {
deviceIdToEndpoint.remove(deviceId);
- logger.warn("Can not redirect to {}, because session can not connect
to it.", endpoint);
}
}
}
@@ -3551,6 +3558,10 @@ public class Session implements ISession {
private String trustStore;
private String trustStorePwd;
+ private int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+ private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
public Builder useSSL(boolean useSSL) {
this.useSSL = useSSL;
return this;
@@ -3633,6 +3644,16 @@ public class Session implements ISession {
return this;
}
+ public Builder maxRetryCount(int maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ return this;
+ }
+
+ public Builder retryIntervalInMs(long retryIntervalInMs) {
+ this.retryIntervalInMs = retryIntervalInMs;
+ return this;
+ }
+
public Session build() {
if (nodeUrls != null
&& (!SessionConfig.DEFAULT_HOST.equals(host) || rpcPort !=
SessionConfig.DEFAULT_PORT)) {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index fdda3c31fb7..6cb618e8fa9 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -80,6 +80,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class SessionConnection {
@@ -99,19 +100,32 @@ public class SessionConnection {
private final Supplier<List<TEndPoint>> availableNodes;
+ private final int maxRetryCount;
+
+ private final long retryIntervalInMs;
+
// TestOnly
public SessionConnection() {
availableNodes = Collections::emptyList;
+ this.maxRetryCount = Math.max(0, SessionConfig.MAX_RETRY_COUNT);
+ this.retryIntervalInMs = Math.max(0, SessionConfig.RETRY_INTERVAL_IN_MS);
}
public SessionConnection(
- Session session, TEndPoint endPoint, ZoneId zoneId,
Supplier<List<TEndPoint>> availableNodes)
+ Session session,
+ TEndPoint endPoint,
+ ZoneId zoneId,
+ Supplier<List<TEndPoint>> availableNodes,
+ int maxRetryCount,
+ long retryIntervalInMs)
throws IoTDBConnectionException {
this.session = session;
this.endPoint = endPoint;
endPointList.add(endPoint);
this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
this.availableNodes = availableNodes;
+ this.maxRetryCount = Math.max(0, maxRetryCount);
+ this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
try {
init(endPoint, session.useSSL, session.trustStore,
session.trustStorePwd);
} catch (IoTDBConnectionException e) {
@@ -119,12 +133,19 @@ public class SessionConnection {
}
}
- public SessionConnection(Session session, ZoneId zoneId,
Supplier<List<TEndPoint>> availableNodes)
+ public SessionConnection(
+ Session session,
+ ZoneId zoneId,
+ Supplier<List<TEndPoint>> availableNodes,
+ int maxRetryCount,
+ long retryIntervalInMs)
throws IoTDBConnectionException {
this.session = session;
this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId;
this.endPointList = SessionUtils.parseSeedNodeUrls(session.nodeUrls);
this.availableNodes = availableNodes;
+ this.maxRetryCount = Math.max(0, maxRetryCount);
+ this.retryIntervalInMs = Math.max(0, retryIntervalInMs);
initClusterConn();
}
@@ -414,24 +435,56 @@ public class SessionConnection {
protected void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
- TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql,
statementId);
- try {
- execReq.setEnableRedirectQuery(enableRedirect);
- TSExecuteStatementResp execResp =
client.executeUpdateStatementV2(execReq);
- RpcUtils.verifySuccess(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
+
+ TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql,
statementId);
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- execReq.setSessionId(sessionId);
- execReq.setStatementId(statementId);
-
RpcUtils.verifySuccess(client.executeUpdateStatementV2(execReq).status);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = executeNonQueryStatementInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ RpcUtils.verifySuccess(status);
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq
request)
+ throws TException {
+ request.setSessionId(sessionId);
+ request.setStatementId(statementId);
+ return client.executeUpdateStatementV2(request).status;
}
protected SessionDataSet executeRawDataQuery(
@@ -654,193 +707,544 @@ public class SessionConnection {
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccessWithRedirection(client.insertRecord(request));
- } catch (TException e) {
- if (reconnect()) {
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
+ }
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ try {
+ status = insertRecordInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
+ }
+ }
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
+ private TSStatus insertRecordInternal(TSInsertRecordReq request) throws
TException {
+ request.setSessionId(sessionId);
+ return client.insertRecord(request);
+ }
+
protected void insertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
-
RpcUtils.verifySuccessWithRedirection(client.insertStringRecord(request));
- } catch (TException e) {
- if (reconnect()) {
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertStringRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
+ }
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ try {
+ status = insertRecordInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
+ }
+ }
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
+ private TSStatus insertRecordInternal(TSInsertStringRecordReq request)
throws TException {
+ request.setSessionId(sessionId);
+ return client.insertStringRecord(request);
+ }
+
protected void insertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(
- client.insertRecords(request), request.getPrefixPaths());
- } catch (TException e) {
- if (reconnect()) {
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
+ }
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ try {
+ status = insertRecordsInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws
TException {
+ request.setSessionId(sessionId);
+ return client.insertRecords(request);
}
protected void insertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(
- client.insertStringRecords(request), request.getPrefixPaths());
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertStringRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
+ }
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ try {
+ status = insertRecordsInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
+ }
+ }
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
+ private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request)
throws TException {
+ request.setSessionId(sessionId);
+ return client.insertStringRecords(request);
+ }
+
protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq
request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
-
RpcUtils.verifySuccessWithRedirection(client.insertRecordsOfOneDevice(request));
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = insertRecordsOfOneDeviceInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
+ throws TException {
+ request.setSessionId(sessionId);
+ return client.insertRecordsOfOneDevice(request);
}
protected void
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
-
RpcUtils.verifySuccessWithRedirection(client.insertStringRecordsOfOneDevice(request));
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
-
RpcUtils.verifySuccess(client.insertStringRecordsOfOneDevice(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = insertStringRecordsOfOneDeviceInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus insertStringRecordsOfOneDeviceInternal(
+ TSInsertStringRecordsOfOneDeviceReq request) throws TException {
+ request.setSessionId(sessionId);
+ return client.insertStringRecordsOfOneDevice(request);
}
protected void insertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccessWithRedirection(client.insertTablet(request));
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertTablet(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = insertTabletInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus insertTabletInternal(TSInsertTabletReq request) throws
TException {
+ request.setSessionId(sessionId);
+ return client.insertTablet(request);
}
protected void insertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(
- client.insertTablets(request), request.getPrefixPaths());
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.insertTablets(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = insertTabletsInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ if (i == 0) {
+ // first time succeed, take account for redirection info
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
+ } else {
+ // if it's retry, just ignore redirection info
+ RpcUtils.verifySuccess(status);
+ }
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws
TException {
+ request.setSessionId(sessionId);
+ return client.insertTablets(request);
}
protected void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = client.deleteTimeseries(sessionId, paths);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ RpcUtils.verifySuccess(status);
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
}
public void deleteData(TSDeleteDataReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.deleteData(request));
- } catch (TException e) {
- if (reconnect()) {
+
+ TException lastTException = null;
+ TSStatus status = null;
+ for (int i = 0; i <= maxRetryCount; i++) {
+ if (i > 0) {
+ // re-init the TException and TSStatus
+ lastTException = null;
+ status = null;
+ // not first time, we need to sleep and then reconnect
try {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.deleteData(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ // just ignore
}
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+ if (!reconnect()) {
+ // reconnect failed, just continue to make another retry.
+ continue;
+ }
+ }
+ try {
+ status = deleteDataInternal(request);
+ // need retry
+ if (status.isSetNeedRetry() && status.isNeedRetry()) {
+ continue;
+ }
+ // succeed or don't need to retry
+ RpcUtils.verifySuccess(status);
+ return;
+ } catch (TException e) {
+ // all network exception need retry until reaching maxRetryCount
+ lastTException = e;
}
}
+
+ if (status != null) {
+ RpcUtils.verifySuccess(status);
+ } else if (lastTException != null) {
+ throw new IoTDBConnectionException(lastTException);
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
+ }
+ }
+
+ private TSStatus deleteDataInternal(TSDeleteDataReq request) throws
TException {
+ request.setSessionId(sessionId);
+ return client.deleteData(request);
}
protected void testInsertRecord(TSInsertStringRecordReq request)
@@ -982,7 +1386,7 @@ public class SessionConnection {
init(endPoint, session.useSSL, session.trustStore,
session.trustStorePwd);
connectedSuccess = true;
} catch (IoTDBConnectionException e) {
- logger.error("The current node may have been down {},try next
node", endPoint);
+ logger.warn("The current node may have been down {},try next
node", endPoint);
continue;
}
break;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index ef3f5385738..aab6b118391 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -148,6 +148,10 @@ public class SessionPool implements ISessionPool {
private boolean enableAutoFetch = true;
+ protected int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+ protected long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
private static final String INSERT_RECORD_FAIL = "insertRecord failed";
private static final String INSERT_RECORD_ERROR_MSG = "unexpected error in
insertRecord";
@@ -479,6 +483,9 @@ public class SessionPool implements ISessionPool {
this.useSSL = builder.useSSL;
this.trustStore = builder.trustStore;
this.trustStorePwd = builder.trustStorePwd;
+ this.maxRetryCount = builder.maxRetryCount;
+ this.retryIntervalInMs = builder.retryIntervalInMs;
+
if (enableAutoFetch) {
initThreadPool();
}
@@ -529,6 +536,8 @@ public class SessionPool implements ISessionPool {
.useSSL(useSSL)
.trustStore(trustStore)
.trustStorePwd(trustStorePwd)
+ .maxRetryCount(maxRetryCount)
+ .retryIntervalInMs(retryIntervalInMs)
.build();
} else {
// Construct redirect-able Session
@@ -546,6 +555,8 @@ public class SessionPool implements ISessionPool {
.useSSL(useSSL)
.trustStore(trustStore)
.trustStorePwd(trustStorePwd)
+ .maxRetryCount(maxRetryCount)
+ .retryIntervalInMs(retryIntervalInMs)
.build();
}
session.setEnableQueryRedirection(enableQueryRedirection);
@@ -848,24 +859,21 @@ public class SessionPool implements ISessionPool {
* 3, 3, 3, 3
*/
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertTablet(tablet, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertTablet failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertTablet", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertTablet(tablet, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertTablet failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertTablet", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -894,24 +902,21 @@ public class SessionPool implements ISessionPool {
@Override
public void insertAlignedTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedTablet(tablet, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedTablet failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedTablet", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedTablet(tablet, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedTablet failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedTablet", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -946,24 +951,21 @@ public class SessionPool implements ISessionPool {
@Override
public void insertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertTablets(tablets, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertTablets failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertTablets", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertTablets(tablets, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertTablets failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertTablets", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -976,24 +978,21 @@ public class SessionPool implements ISessionPool {
@Override
public void insertAlignedTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedTablets(tablets, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedTablets failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedTablets", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedTablets(tablets, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedTablets failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedTablets", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1013,24 +1012,21 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecords(deviceIds, times, measurementsList, typesList,
valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecords(deviceIds, times, measurementsList, typesList,
valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1050,25 +1046,22 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecords(
- multiSeriesIds, times, multiMeasurementComponentsList, typesList,
valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecords", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecords(
+ multiSeriesIds, times, multiMeasurementComponentsList, typesList,
valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecords", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1087,25 +1080,22 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList, false);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList, false);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1126,25 +1116,22 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList, false);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList, false);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1163,25 +1150,21 @@ public class SessionPool implements ISessionPool {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertStringRecordsOfOneDevice(
- deviceId, times, measurementsList, valuesList, false);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertStringRecordsOfOneDevice(deviceId, times,
measurementsList, valuesList, false);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1202,25 +1185,22 @@ public class SessionPool implements ISessionPool {
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1243,25 +1223,22 @@ public class SessionPool implements ISessionPool {
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORDS_OF_ONE_DEVICE_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_OF_ONE_DEVICE_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1282,25 +1259,22 @@ public class SessionPool implements ISessionPool {
List<List<String>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertStringRecordsOfOneDevice(
- deviceId, times, measurementsList, valuesList, haveSorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertStringRecordsOfOneDevice(
+ deviceId, times, measurementsList, valuesList, haveSorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertStringRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertStringRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1320,25 +1294,22 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList, false);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList, false);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1357,25 +1328,21 @@ public class SessionPool implements ISessionPool {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedStringRecordsOfOneDevice(
- deviceId, times, measurementsList, valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in
insertAlignedStringRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedStringRecordsOfOneDevice(deviceId, times,
measurementsList, valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in
insertAlignedStringRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1397,25 +1364,23 @@ public class SessionPool implements ISessionPool {
List<List<Object>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecordsOfOneDevice(
- deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecordsOfOneDevice(
+ deviceId, times, measurementsList, typesList, valuesList,
haveSorted);
+ putBack(session);
+ return;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1436,25 +1401,22 @@ public class SessionPool implements ISessionPool {
List<List<String>> valuesList,
boolean haveSorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedStringRecordsOfOneDevice(
- deviceId, times, measurementsList, valuesList, haveSorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in
insertAlignedStringRecordsOfOneDevice", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedStringRecordsOfOneDevice(
+ deviceId, times, measurementsList, valuesList, haveSorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedStringRecordsOfOneDevice failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in
insertAlignedStringRecordsOfOneDevice", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1472,24 +1434,21 @@ public class SessionPool implements ISessionPool {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecords(deviceIds, times, measurementsList, valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecords(deviceIds, times, measurementsList, valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORDS_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1507,25 +1466,22 @@ public class SessionPool implements ISessionPool {
List<List<String>> multiMeasurementComponentsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecords(
- multiSeriesIds, times, multiMeasurementComponentsList, valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecords", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecords(
+ multiSeriesIds, times, multiMeasurementComponentsList, valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecords", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1544,24 +1500,21 @@ public class SessionPool implements ISessionPool {
List<TSDataType> types,
Object... values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecord(deviceId, time, measurements, types, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.error(INSERT_RECORD_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecord(deviceId, time, measurements, types, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.error(INSERT_RECORD_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1580,24 +1533,21 @@ public class SessionPool implements ISessionPool {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecord(deviceId, time, measurements, types, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORD_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecord(deviceId, time, measurements, types, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORD_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1642,24 +1592,21 @@ public class SessionPool implements ISessionPool {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecord(multiSeriesId, time,
multiMeasurementComponents, types, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecord failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecord", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecord(multiSeriesId, time,
multiMeasurementComponents, types, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecord failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecord", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1674,24 +1621,21 @@ public class SessionPool implements ISessionPool {
public void insertRecord(
String deviceId, long time, List<String> measurements, List<String>
values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertRecord(deviceId, time, measurements, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(INSERT_RECORD_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertRecord(deviceId, time, measurements, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(INSERT_RECORD_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(INSERT_RECORD_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1706,52 +1650,46 @@ public class SessionPool implements ISessionPool {
public void insertAlignedRecord(
String multiSeriesId, long time, List<String>
multiMeasurementComponents, List<String> values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.insertAlignedRecord(multiSeriesId, time,
multiMeasurementComponents, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("insertAlignedRecord failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in insertAlignedRecord", e);
- putBack(session);
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * This method NOT insert data into database and the server just return
after accept the request,
- * this method should be used to test other time cost in client
- */
- @Override
- public void testInsertTablet(Tablet tablet)
- throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertTablet(tablet);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertTablet failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertTablet", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.insertAlignedRecord(multiSeriesId, time,
multiMeasurementComponents, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("insertAlignedRecord failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in insertAlignedRecord", e);
+ putBack(session);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * This method NOT insert data into database and the server just return
after accept the request,
+ * this method should be used to test other time cost in client
+ */
+ @Override
+ public void testInsertTablet(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ ISession session = getSession();
+ try {
+ session.testInsertTablet(tablet);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertTablet failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertTablet", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1762,24 +1700,21 @@ public class SessionPool implements ISessionPool {
@Override
public void testInsertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertTablet(tablet, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertTablet failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertTablet", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertTablet(tablet, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertTablet failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertTablet", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1790,24 +1725,21 @@ public class SessionPool implements ISessionPool {
@Override
public void testInsertTablets(Map<String, Tablet> tablets)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertTablets(tablets);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertTablets failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertTablets", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertTablets(tablets);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertTablets failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertTablets", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1818,24 +1750,21 @@ public class SessionPool implements ISessionPool {
@Override
public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertTablets(tablets, sorted);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertTablets failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertTablets", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertTablets(tablets, sorted);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertTablets failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertTablets", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1850,24 +1779,21 @@ public class SessionPool implements ISessionPool {
List<List<String>> measurementsList,
List<List<String>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertRecords(deviceIds, times, measurementsList,
valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertRecords", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertRecords(deviceIds, times, measurementsList,
valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertRecords", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1883,24 +1809,21 @@ public class SessionPool implements ISessionPool {
List<List<TSDataType>> typesList,
List<List<Object>> valuesList)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertRecords(deviceIds, times, measurementsList,
typesList, valuesList);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertRecords failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertRecords", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertRecords(deviceIds, times, measurementsList, typesList,
valuesList);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertRecords failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertRecords", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1912,24 +1835,21 @@ public class SessionPool implements ISessionPool {
public void testInsertRecord(
String deviceId, long time, List<String> measurements, List<String>
values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertRecord(deviceId, time, measurements, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertRecord failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertRecord", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertRecord(deviceId, time, measurements, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertRecord failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertRecord", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1945,24 +1865,21 @@ public class SessionPool implements ISessionPool {
List<TSDataType> types,
List<Object> values)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.testInsertRecord(deviceId, time, measurements, types, values);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("testInsertRecord failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in testInsertRecord", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.testInsertRecord(deviceId, time, measurements, types, values);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("testInsertRecord failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in testInsertRecord", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -1974,24 +1891,21 @@ public class SessionPool implements ISessionPool {
@Override
public void deleteTimeseries(String path)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.deleteTimeseries(path);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("deleteTimeseries failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in deleteTimeseries", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.deleteTimeseries(path);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("deleteTimeseries failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in deleteTimeseries", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -2003,24 +1917,21 @@ public class SessionPool implements ISessionPool {
@Override
public void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.deleteTimeseries(paths);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("deleteTimeseries failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in deleteTimeseries", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.deleteTimeseries(paths);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("deleteTimeseries failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in deleteTimeseries", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -2033,24 +1944,21 @@ public class SessionPool implements ISessionPool {
@Override
public void deleteData(String path, long time)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.deleteData(path, time);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(DELETE_DATA_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(DELETE_DATA_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.deleteData(path, time);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(DELETE_DATA_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -2063,24 +1971,21 @@ public class SessionPool implements ISessionPool {
@Override
public void deleteData(List<String> paths, long time)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.deleteData(paths, time);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(DELETE_DATA_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(DELETE_DATA_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.deleteData(paths, time);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(DELETE_DATA_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -2094,24 +1999,21 @@ public class SessionPool implements ISessionPool {
@Override
public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.deleteData(paths, startTime, endTime);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn(DELETE_DATA_FAIL, e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error(DELETE_DATA_ERROR_MSG, e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.deleteData(paths, startTime, endTime);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn(DELETE_DATA_FAIL, e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error(DELETE_DATA_ERROR_MSG, e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -3111,24 +3013,21 @@ public class SessionPool implements ISessionPool {
@Override
public void executeNonQueryStatement(String sql)
throws StatementExecutionException, IoTDBConnectionException {
- for (int i = 0; i < RETRY; i++) {
- ISession session = getSession();
- try {
- session.executeNonQueryStatement(sql);
- putBack(session);
- return;
- } catch (IoTDBConnectionException e) {
- // TException means the connection is broken, remove it and get a new
one.
- LOGGER.warn("executeNonQueryStatement failed", e);
- cleanSessionAndMayThrowConnectionException(session, i, e);
- } catch (StatementExecutionException | RuntimeException e) {
- putBack(session);
- throw e;
- } catch (Throwable e) {
- LOGGER.error("unexpected error in executeNonQueryStatement", e);
- putBack(session);
- throw new RuntimeException(e);
- }
+ ISession session = getSession();
+ try {
+ session.executeNonQueryStatement(sql);
+ putBack(session);
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new
one.
+ LOGGER.warn("executeNonQueryStatement failed", e);
+ cleanSessionAndMayThrowConnectionException(session, FINAL_RETRY, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ } catch (Throwable e) {
+ LOGGER.error("unexpected error in executeNonQueryStatement", e);
+ putBack(session);
+ throw new RuntimeException(e);
}
}
@@ -3615,6 +3514,10 @@ public class SessionPool implements ISessionPool {
private boolean enableAutoFetch;
+ private int maxRetryCount = SessionConfig.MAX_RETRY_COUNT;
+
+ private long retryIntervalInMs = SessionConfig.RETRY_INTERVAL_IN_MS;
+
public Builder useSSL(boolean useSSL) {
this.useSSL = useSSL;
return this;
@@ -3710,6 +3613,16 @@ public class SessionPool implements ISessionPool {
return this;
}
+ public Builder maxRetryCount(int maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ return this;
+ }
+
+ public Builder retryIntervalInMs(long retryIntervalInMs) {
+ this.retryIntervalInMs = retryIntervalInMs;
+ return this;
+ }
+
public SessionPool build() {
return new SessionPool(this);
}
diff --git
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
index 75f75bc9243..61a5abd3865 100644
---
a/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
+++
b/iotdb-client/session/src/test/java/org/apache/iotdb/session/SessionConnectionTest.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.session;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
@@ -169,7 +170,9 @@ public class SessionConnectionTest {
new SessionConnection(
session,
ZoneId.systemDefault(),
- () -> Collections.singletonList(new TEndPoint("local", 12)));
+ () -> Collections.singletonList(new TEndPoint("local", 12)),
+ SessionConfig.MAX_RETRY_COUNT,
+ SessionConfig.RETRY_INTERVAL_IN_MS);
}
@Test(expected = IoTDBConnectionException.class)
@@ -187,7 +190,9 @@ public class SessionConnectionTest {
session,
new TEndPoint("localhost", 1234),
ZoneId.systemDefault(),
- () -> Collections.singletonList(new TEndPoint("local", 12)));
+ () -> Collections.singletonList(new TEndPoint("local", 12)),
+ SessionConfig.MAX_RETRY_COUNT,
+ SessionConfig.RETRY_INTERVAL_IN_MS);
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 2f808350717..6c8c2d15bf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -54,6 +54,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+
/**
* The coordinator for MPP. It manages all the queries which are executed in
current Node. And it
* will be responsible for the lifecycle of a query. A query request will be
represented as a
@@ -135,7 +137,7 @@ public class Coordinator {
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
MPPQueryContext queryContext = null;
try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
- if (sql != null && sql.length() > 0) {
+ if (sql != null && !sql.isEmpty()) {
LOGGER.debug("[QueryStart] sql: {}", sql);
}
queryContext =
@@ -160,7 +162,12 @@ public class Coordinator {
queryContext.setTimeOut(Long.MAX_VALUE);
}
execution.start();
- return execution.getStatus();
+ ExecutionResult result = execution.getStatus();
+ if (!execution.isQuery() && result.status != null &&
needRetry(result.status)) {
+ // if it's write request and the result status needs to retry
+ result.status.setNeedRetry(true);
+ }
+ return result;
} finally {
int lockNums = queryContext.getAcquiredLockNum();
if (queryContext != null && lockNums > 0) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
index aeff020a0da..a98194fccc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncSendPlanNodeHandler.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.scheduler;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
-import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
+import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.mpp.rpc.thrift.TSendBatchPlanNodeResp;
import org.apache.iotdb.mpp.rpc.thrift.TSendSinglePlanNodeResp;
import org.apache.iotdb.rpc.RpcUtils;
@@ -104,6 +104,6 @@ public class AsyncSendPlanNodeHandler implements
AsyncMethodCallback<TSendBatchP
}
private boolean needRetry(TSendSinglePlanNodeResp resp) {
- return !resp.accepted && DispatchLogHandler.needRetry(resp.status.code);
+ return !resp.accepted && resp.status != null &&
StatusUtils.needRetryHelper(resp.status);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index b48e8201629..c542bd09c8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -284,8 +284,9 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
return this.localhostIpAddr.equals(endPoint.getIp()) &&
localhostInternalPort == endPoint.port;
}
- private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
- throws FragmentInstanceDispatchException {
+ private void dispatchRemoteHelper(FragmentInstance instance, TEndPoint
endPoint)
+ throws FragmentInstanceDispatchException, TException,
ClientManagerException,
+ RatisReadUnavailableException {
try (SyncDataNodeInternalServiceClient client =
syncInternalServiceClientManager.borrowClient(endPoint)) {
switch (instance.getType()) {
@@ -348,18 +349,35 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown read type [%s]",
instance.getType())));
}
+ }
+ }
+
+ private void dispatchRemote(FragmentInstance instance, TEndPoint endPoint)
+ throws FragmentInstanceDispatchException {
+
+ try {
+ dispatchRemoteHelper(instance, endPoint);
} catch (ClientManagerException | TException |
RatisReadUnavailableException e) {
logger.warn(
- "can't execute request on node {}, error msg is {}.",
+ "can't execute request on node {}, error msg is {}, and we try to
reconnect this node.",
endPoint,
ExceptionUtils.getRootCause(e).toString());
- TSStatus status = new TSStatus();
- status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
- status.setMessage("can't connect to node " + endPoint);
- // If the DataNode cannot be connected, its endPoint will be put into
black list
- // so that the following retry will avoid dispatching instance towards
this DataNode.
- queryContext.addFailedEndPoint(endPoint);
- throw new FragmentInstanceDispatchException(status);
+ // we just retry once to clear stale connection for a restart node.
+ try {
+ dispatchRemoteHelper(instance, endPoint);
+ } catch (ClientManagerException | TException |
RatisReadUnavailableException e1) {
+ logger.warn(
+ "can't execute request on node {} in second try, error msg is
{}.",
+ endPoint,
+ ExceptionUtils.getRootCause(e1).toString());
+ TSStatus status = new TSStatus();
+ status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ status.setMessage("can't connect to node " + endPoint);
+ // If the DataNode cannot be connected, its endPoint will be put into
black list
+ // so that the following retry will avoid dispatching instance towards
this DataNode.
+ queryContext.addFailedEndPoint(endPoint);
+ throw new FragmentInstanceDispatchException(status);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 9d17c3594ac..2bbd1bd0442 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -40,6 +40,8 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
+import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+
public class ErrorHandlingUtils {
private ErrorHandlingUtils() {}
@@ -62,7 +64,17 @@ public class ErrorHandlingUtils {
} else {
LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
}
- return RpcUtils.getStatus(statusCode, message + e.getMessage());
+ if (e instanceof SemanticException) {
+ Throwable rootCause = getRootCause(e);
+ if (e.getCause() instanceof IoTDBException) {
+ return RpcUtils.getStatus(
+ ((IoTDBException) e.getCause()).getErrorCode(),
rootCause.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SEMANTIC_ERROR,
rootCause.getMessage());
+ }
+ TSStatus status = RpcUtils.getStatus(statusCode, message + e.getMessage());
+ status.setNeedRetry(needRetry(status));
+ return status;
}
public static TSStatus onNpeOrUnexpectedException(
@@ -91,6 +103,7 @@ public class ErrorHandlingUtils {
LOGGER.warn(message, e);
}
}
+ status.setNeedRetry(needRetry(status));
return status;
} else {
return onNpeOrUnexpectedException(e, operation, statusCode);
@@ -105,7 +118,7 @@ public class ErrorHandlingUtils {
return onQueryException(e, operation.getName());
}
- public static TSStatus tryCatchQueryException(Exception e) {
+ private static TSStatus tryCatchQueryException(Exception e) {
Throwable rootCause = getRootCause(e);
// ignore logging sg not ready exception
if (rootCause instanceof StorageGroupNotReadyException) {
@@ -146,16 +159,19 @@ public class ErrorHandlingUtils {
public static TSStatus onNonQueryException(Exception e, String operation) {
TSStatus status = tryCatchNonQueryException(e);
- return status != null
- ? status
- : onNpeOrUnexpectedException(e, operation,
TSStatusCode.INTERNAL_SERVER_ERROR);
+ if (status != null) {
+ status.setNeedRetry(needRetry(status));
+ return status;
+ } else {
+ return onNpeOrUnexpectedException(e, operation,
TSStatusCode.INTERNAL_SERVER_ERROR);
+ }
}
public static TSStatus onNonQueryException(Exception e, OperationType
operation) {
return onNonQueryException(e, operation.getName());
}
- public static TSStatus tryCatchNonQueryException(Exception e) {
+ private static TSStatus tryCatchNonQueryException(Exception e) {
String message = "Exception occurred while processing non-read. ";
if (e instanceof BatchProcessException) {
BatchProcessException batchException = (BatchProcessException) e;
@@ -184,7 +200,9 @@ public class ErrorHandlingUtils {
String.format(
"[%s] Exception occurred: %s failed. %s", statusCode, operation,
e.getMessage());
LOGGER.warn(ERROR_OPERATION_LOG, statusCode, operation, e);
- return RpcUtils.getStatus(errorCode, message);
+ TSStatus status = RpcUtils.getStatus(errorCode, message);
+ status.setNeedRetry(needRetry(status));
+ return status;
}
public static TSStatus onIoTDBException(Exception e, OperationType
operation, int errorCode) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 11fd755a8ca..c54a8325685 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -405,6 +405,10 @@ public class CommonConfig {
return status == NodeStatus.ReadOnly;
}
+ public boolean isRunning() {
+ return status == NodeStatus.Running;
+ }
+
public NodeStatus getNodeStatus() {
return status;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 83f041b0f6e..be32bf5a459 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -20,11 +20,15 @@
package org.apache.iotdb.commons.utils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
public class StatusUtils {
private StatusUtils() {}
@@ -34,6 +38,29 @@ public class StatusUtils {
public static final TSStatus EXECUTE_STATEMENT_ERROR =
getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ private static final Set<Integer> NEED_RETRY = new HashSet<>();
+
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+
+ static {
+ NEED_RETRY.add(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.DISPATCH_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.SYSTEM_READ_ONLY.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.STORAGE_ENGINE_NOT_READY.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.WRITE_PROCESS_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.WAL_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_TIME_OUT.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.NO_AVAILABLE_REGION_GROUP.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode());
+ NEED_RETRY.add(TSStatusCode.NO_ENOUGH_DATANODE.getStatusCode());
+ }
+
/**
* @param statusMap index -> status
* @param size the total number of status to generate
@@ -171,4 +198,29 @@ public class StatusUtils {
}
return status;
}
+
+ public static boolean needRetry(TSStatus status) {
+ // always retry while node is in not running case
+ if (!COMMON_CONFIG.isRunning()) {
+ return true;
+ } else if (status == null) {
+ return false;
+ }
+ return needRetryHelper(status);
+ }
+
+ public static boolean needRetryHelper(TSStatus status) {
+ int code = status.getCode();
+ if (code == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ for (TSStatus subStatus : status.subStatus) {
+ if (subStatus == null
+ || (subStatus.getCode() != OK.code &&
!NEED_RETRY.contains(subStatus.getCode()))) {
+ return false;
+ }
+ }
+ return true;
+ } else {
+ return NEED_RETRY.contains(code);
+ }
+ }
}
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index dbdb8bd406f..b574b52de09 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -33,6 +33,7 @@ struct TSStatus {
2: optional string message
3: optional list<TSStatus> subStatus
4: optional TEndPoint redirectNode
+ 5: optional bool needRetry
}
enum TConsensusGroupType {