This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 650bbcc25f7 [RTO/RPO] Unify retry logic on SessionConnection (#14894)
650bbcc25f7 is described below
commit 650bbcc25f7e24bf0942a90b3806a3ff3af555af
Author: William Song <[email protected]>
AuthorDate: Fri Feb 21 15:37:56 2025 +0800
[RTO/RPO] Unify retry logic on SessionConnection (#14894)
* refactor replications in session connection
* fix npe error
* change version to v2
* address review issues
---
.../apache/iotdb/session/SessionConnection.java | 1019 +++++++-------------
.../apache/iotdb/session/util/CheckedSupplier.java | 32 -
2 files changed, 323 insertions(+), 728 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 31abf69ec2e..9dbbbc343e2 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -63,7 +63,6 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
-import org.apache.iotdb.session.util.CheckedSupplier;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
@@ -72,6 +71,7 @@ import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -287,12 +287,14 @@ public class SessionConnection {
protected void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
- doOperation(
- () -> {
- TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
- RpcUtils.verifySuccess(client.setTimeZone(req));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId,
zoneId);
+ return client.setTimeZone(req);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
setTimeZoneOfSession(zoneId);
}
@@ -309,50 +311,52 @@ public class SessionConnection {
protected void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- RpcUtils.verifySuccess(client.setStorageGroup(sessionId,
storageGroup));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(() -> client.setStorageGroup(sessionId,
storageGroup)).getResult();
+ RpcUtils.verifySuccess(status);
}
protected void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(() -> client.deleteStorageGroups(sessionId,
storageGroups)).getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createTimeseries(TSCreateTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createTimeseries(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createAlignedTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createMultiTimeseries(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createMultiTimeseries(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected boolean checkTimeseriesExists(String path, long timeout)
@@ -377,26 +381,22 @@ public class SessionConnection {
TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql,
statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeout);
- TSExecuteStatementResp execResp;
- try {
- execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeQueryStatementV2(execReq);
+ execReq.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ execReq.setSessionId(sessionId);
+ execReq.setStatementId(statementId);
+ return client.executeQueryStatementV2(execReq);
+ });
+ TSExecuteStatementResp execResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execReq.setSessionId(sessionId);
- execReq.setStatementId(statementId);
- execResp = client.executeQueryStatementV2(execReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(execResp.getStatus());
}
- RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet(
sql,
execResp.getColumns(),
@@ -419,49 +419,8 @@ public class SessionConnection {
protected void executeNonQueryStatement(String sql)
throws IoTDBConnectionException, StatementExecutionException {
-
TSExecuteStatementReq request = new TSExecuteStatementReq(sessionId, sql,
statementId);
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = executeNonQueryStatementInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerify(() -> executeNonQueryStatementInternal(request));
}
private TSStatus executeNonQueryStatementInternal(TSExecuteStatementReq
request)
@@ -491,26 +450,23 @@ public class SessionConnection {
new TSRawDataQueryReq(sessionId, paths, startTime, endTime,
statementId);
execReq.setFetchSize(session.fetchSize);
execReq.setTimeout(timeOut);
- TSExecuteStatementResp execResp;
- try {
- execReq.setEnableRedirectQuery(enableRedirect);
- execResp = client.executeRawDataQueryV2(execReq);
+ execReq.setEnableRedirectQuery(enableRedirect);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ execReq.setSessionId(sessionId);
+ execReq.setStatementId(statementId);
+ return client.executeRawDataQueryV2(execReq);
+ });
+
+ TSExecuteStatementResp execResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execReq.setSessionId(sessionId);
- execReq.setStatementId(statementId);
- execResp = client.executeRawDataQueryV2(execReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(execResp.getStatus());
}
- RpcUtils.verifySuccess(execResp.getStatus());
return new SessionDataSet(
"",
execResp.getColumns(),
@@ -538,28 +494,27 @@ public class SessionConnection {
req.setEnableRedirectQuery(enableRedirect);
req.setLegalPathNodes(isLegalPathNodes);
req.setTimeout(timeOut);
- TSExecuteStatementResp tsExecuteStatementResp = null;
TEndPoint redirectedEndPoint = null;
- try {
- tsExecuteStatementResp =
client.executeFastLastDataQueryForOneDeviceV2(req);
-
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (RedirectException e) {
- redirectedEndPoint = e.getEndPoint();
- } catch (TException e) {
- if (reconnect()) {
- try {
- req.setSessionId(sessionId);
- req.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeFastLastDataQueryForOneDeviceV2(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ req.setStatementId(statementId);
+ return client.executeFastLastDataQueryForOneDeviceV2(req);
+ });
+
+ TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
+ try {
+
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
+ } catch (RedirectException e) {
+ redirectedEndPoint = e.getEndPoint();
}
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new Pair<>(
new SessionDataSet(
"",
@@ -587,25 +542,22 @@ public class SessionConnection {
tsLastDataQueryReq.setFetchSize(session.fetchSize);
tsLastDataQueryReq.setEnableRedirectQuery(enableRedirect);
tsLastDataQueryReq.setTimeout(timeOut);
- TSExecuteStatementResp tsExecuteStatementResp;
- try {
- tsExecuteStatementResp =
client.executeLastDataQueryV2(tsLastDataQueryReq);
+
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ tsLastDataQueryReq.setSessionId(sessionId);
+ tsLastDataQueryReq.setStatementId(statementId);
+ return client.executeLastDataQueryV2(tsLastDataQueryReq);
+ });
+ final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- tsLastDataQueryReq.setSessionId(sessionId);
- tsLastDataQueryReq.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeLastDataQueryV2(tsLastDataQueryReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new SessionDataSet(
"",
tsExecuteStatementResp.getColumns(),
@@ -672,25 +624,21 @@ public class SessionConnection {
private SessionDataSet executeAggregationQuery(TSAggregationQueryReq
tsAggregationQueryReq)
throws StatementExecutionException, IoTDBConnectionException,
RedirectException {
- TSExecuteStatementResp tsExecuteStatementResp;
- try {
- tsExecuteStatementResp =
client.executeAggregationQueryV2(tsAggregationQueryReq);
+ RetryResult<TSExecuteStatementResp> result =
+ callWithReconnect(
+ () -> {
+ tsAggregationQueryReq.setSessionId(sessionId);
+ tsAggregationQueryReq.setStatementId(statementId);
+ return client.executeAggregationQueryV2(tsAggregationQueryReq);
+ });
+
+ TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
+ if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(tsExecuteStatementResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- tsAggregationQueryReq.setSessionId(sessionId);
- tsAggregationQueryReq.setStatementId(statementId);
- tsExecuteStatementResp =
client.executeAggregationQuery(tsAggregationQueryReq);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ } else {
+ RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
}
- RpcUtils.verifySuccess(tsExecuteStatementResp.getStatus());
return new SessionDataSet(
"",
tsExecuteStatementResp.getColumns(),
@@ -720,52 +668,7 @@ public class SessionConnection {
protected void insertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
}
private TSStatus insertRecordInternal(TSInsertRecordReq request) throws
TException {
@@ -775,52 +678,7 @@ public class SessionConnection {
protected void insertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() -> insertRecordInternal(request));
}
private TSStatus insertRecordInternal(TSInsertStringRecordReq request)
throws TException {
@@ -830,52 +688,8 @@ public class SessionConnection {
protected void insertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertRecordsInternal(request), request::getPrefixPaths);
}
private TSStatus insertRecordsInternal(TSInsertRecordsReq request) throws
TException {
@@ -885,53 +699,8 @@ public class SessionConnection {
protected void insertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertRecordsInternal(request), request::getPrefixPaths);
}
private TSStatus insertRecordsInternal(TSInsertStringRecordsReq request)
throws TException {
@@ -941,53 +710,7 @@ public class SessionConnection {
protected void insertRecordsOfOneDevice(TSInsertRecordsOfOneDeviceReq
request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertRecordsOfOneDeviceInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() ->
insertRecordsOfOneDeviceInternal(request));
}
private TSStatus
insertRecordsOfOneDeviceInternal(TSInsertRecordsOfOneDeviceReq request)
@@ -998,53 +721,7 @@ public class SessionConnection {
protected void
insertStringRecordsOfOneDevice(TSInsertStringRecordsOfOneDeviceReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertStringRecordsOfOneDeviceInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirection(() ->
insertStringRecordsOfOneDeviceInternal(request));
}
private TSStatus insertStringRecordsOfOneDeviceInternal(
@@ -1053,57 +730,38 @@ public class SessionConnection {
return client.insertStringRecordsOfOneDevice(request);
}
- protected void withRetry(TFunction<TSStatus> function)
+ private void callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ TFunction<TSStatus> function, Supplier<List<String>> pathSupplier)
throws StatementExecutionException, RedirectException,
IoTDBConnectionException {
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn(
- "Thread {} was interrupted during retry {} with wait time {} ms.
Exiting retry loop.",
- Thread.currentThread().getName(),
- i,
- retryIntervalInMs);
- break;
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = function.run();
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirection(status);
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
+ RetryResult<TSStatus> result = callWithRetry(function);
+
+ TSStatus status = result.getResult();
+ if (status != null) {
+ if (result.getRetryAttempts() == 0) {
+ RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
pathSupplier.get());
+ } else {
+ RpcUtils.verifySuccess(status);
}
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
+ } else {
+ throw new IoTDBConnectionException(logForReconnectionFailure());
}
+ }
+
+ private void callWithRetryAndVerifyWithRedirection(TFunction<TSStatus>
function)
+ throws StatementExecutionException, RedirectException,
IoTDBConnectionException {
+ RetryResult<TSStatus> result = callWithRetry(function);
+ TSStatus status = result.getResult();
if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
+ if (result.getRetryAttempts() == 0) {
+ RpcUtils.verifySuccessWithRedirection(status);
+ } else {
+ RpcUtils.verifySuccess(status);
+ }
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
} else {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
@@ -1111,7 +769,7 @@ public class SessionConnection {
protected void insertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- withRetry(() -> insertTabletInternal(request));
+ callWithRetryAndVerifyWithRedirection(() -> insertTabletInternal(request));
}
private TSStatus insertTabletInternal(TSInsertTabletReq request) throws
TException {
@@ -1121,53 +779,8 @@ public class SessionConnection {
protected void insertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = insertTabletsInternal(request);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- if (i == 0) {
- // first time succeed, take account for redirection info
- RpcUtils.verifySuccessWithRedirectionForMultiDevices(status,
request.getPrefixPaths());
- } else {
- // if it's retry, just ignore redirection info
- RpcUtils.verifySuccess(status);
- }
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
-
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ callWithRetryAndVerifyWithRedirectionForMultipleDevices(
+ () -> insertTabletsInternal(request), request::getPrefixPaths);
}
private TSStatus insertTabletsInternal(TSInsertTabletsReq request) throws
TException {
@@ -1177,55 +790,31 @@ public class SessionConnection {
protected void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
+ callWithRetryAndVerify(() -> client.deleteTimeseries(sessionId, paths));
+ }
- TException lastTException = null;
- TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
- if (i > 0) {
- // re-init the TException and TSStatus
- lastTException = null;
- status = null;
- // not first time, we need to sleep and then reconnect
- try {
- TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
- } catch (InterruptedException e) {
- // just ignore
- }
- if (!reconnect()) {
- // reconnect failed, just continue to make another retry.
- continue;
- }
- }
- try {
- status = client.deleteTimeseries(sessionId, paths);
- // need retry
- if (status.isSetNeedRetry() && status.isNeedRetry()) {
- continue;
- }
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
- } catch (TException e) {
- // all network exception need retry until reaching maxRetryCount
- lastTException = e;
- }
- }
+ public void deleteData(TSDeleteDataReq request)
+ throws IoTDBConnectionException, StatementExecutionException {
+ callWithRetryAndVerify(() -> deleteDataInternal(request));
+ }
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
+ private void callWithRetryAndVerify(TFunction<TSStatus> rpc)
+ throws IoTDBConnectionException, StatementExecutionException {
+ RetryResult<TSStatus> result = callWithRetry(rpc);
+ if (result.getResult() != null) {
+ RpcUtils.verifySuccess(result.getResult());
+ } else if (result.getException() != null) {
+ throw new IoTDBConnectionException(result.getException());
} else {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
- public void deleteData(TSDeleteDataReq request)
- throws IoTDBConnectionException, StatementExecutionException {
-
+ private RetryResult<TSStatus> callWithRetry(TFunction<TSStatus> rpc) {
TException lastTException = null;
TSStatus status = null;
- for (int i = 0; i <= maxRetryCount; i++) {
+ int i;
+ for (i = 0; i <= maxRetryCount; i++) {
if (i > 0) {
// re-init the TException and TSStatus
lastTException = null;
@@ -1234,7 +823,13 @@ public class SessionConnection {
try {
TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
} catch (InterruptedException e) {
- // just ignore
+ Thread.currentThread().interrupt();
+ logger.warn(
+ "Thread {} was interrupted during retry {} with wait time {} ms.
Exiting retry loop.",
+ Thread.currentThread().getName(),
+ i,
+ retryIntervalInMs);
+ break;
}
if (!reconnect()) {
// reconnect failed, just continue to make another retry.
@@ -1242,27 +837,19 @@ public class SessionConnection {
}
}
try {
- status = deleteDataInternal(request);
+ status = rpc.run();
// need retry
if (status.isSetNeedRetry() && status.isNeedRetry()) {
continue;
}
- // succeed or don't need to retry
- RpcUtils.verifySuccess(status);
- return;
+ break;
} catch (TException e) {
// all network exception need retry until reaching maxRetryCount
lastTException = e;
}
}
- if (status != null) {
- RpcUtils.verifySuccess(status);
- } else if (lastTException != null) {
- throw new IoTDBConnectionException(lastTException);
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
+ return new RetryResult<>(status, lastTException, i);
}
private TSStatus deleteDataInternal(TSDeleteDataReq request) throws
TException {
@@ -1272,62 +859,74 @@ public class SessionConnection {
protected void testInsertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertStringRecord(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertStringRecord(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertRecord(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertRecord(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
public void testInsertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertStringRecords(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertStringRecords(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
public void testInsertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertRecords(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertRecords(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertTablet(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertTablet(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void testInsertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.testInsertTablets(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.testInsertTablets(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
@SuppressWarnings({
@@ -1380,105 +979,121 @@ public class SessionConnection {
protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.createSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.appendSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.pruneSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
throws StatementExecutionException, IoTDBConnectionException {
- return doOperation(
- () -> {
- req.setSessionId(sessionId);
- TSQueryTemplateResp execResp = client.querySchemaTemplate(req);
- RpcUtils.verifySuccess(execResp.getStatus());
- return execResp;
- });
+ final TSQueryTemplateResp execResp =
+ callWithReconnect(
+ () -> {
+ req.setSessionId(sessionId);
+ return client.querySchemaTemplate(req);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(execResp.getStatus());
+ return execResp;
}
protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.setSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.setSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.unsetSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.dropSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected void createTimeseriesUsingSchemaTemplate(
TCreateTimeseriesUsingSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- doOperation(
- () -> {
- request.setSessionId(sessionId);
-
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
- return null;
- });
+ final TSStatus status =
+ callWithReconnect(
+ () -> {
+ request.setSessionId(sessionId);
+ return client.createTimeseriesUsingSchemaTemplate(request);
+ })
+ .getResult();
+ RpcUtils.verifySuccess(status);
}
protected TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
- return doOperation(
- () -> {
- TSBackupConfigurationResp execResp = client.getBackupConfiguration();
- RpcUtils.verifySuccess(execResp.getStatus());
- return execResp;
- });
+ final TSBackupConfigurationResp execResp =
+ callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+ RpcUtils.verifySuccess(execResp.getStatus());
+ return execResp;
}
- private <RETURN> RETURN doOperation(CheckedSupplier<RETURN, TException>
supplier)
- throws IoTDBConnectionException, StatementExecutionException {
- RETURN ret;
+ private <T> RetryResult<T> callWithReconnect(TFunction<T> supplier)
+ throws IoTDBConnectionException {
+ T ret;
try {
- ret = supplier.get();
+ ret = supplier.run();
+ return new RetryResult<>(ret, null, 0);
} catch (TException e) {
if (reconnect()) {
try {
- ret = supplier.get();
+ ret = supplier.run();
+ return new RetryResult<>(ret, null, 1);
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -1486,23 +1101,10 @@ public class SessionConnection {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
- return ret;
}
public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
- try {
- return client.fetchAllConnectionsInfo();
- } catch (TException e) {
- if (reconnect()) {
- try {
- return client.fetchAllConnectionsInfo();
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return callWithReconnect(() ->
client.fetchAllConnectionsInfo()).getResult();
}
public boolean isEnableRedirect() {
@@ -1544,4 +1146,29 @@ public class SessionConnection {
private interface TFunction<T> {
T run() throws TException;
}
+
+ private static class RetryResult<T> {
+ private final T result;
+ private final TException exception;
+ private final int retryAttempts;
+
+ public RetryResult(T result, TException exception, int retryAttempts) {
+ Preconditions.checkArgument(result == null || exception == null);
+ this.result = result;
+ this.exception = exception;
+ this.retryAttempts = retryAttempts;
+ }
+
+ public int getRetryAttempts() {
+ return retryAttempts;
+ }
+
+ public TException getException() {
+ return exception;
+ }
+
+ public T getResult() {
+ return result;
+ }
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
deleted file mode 100644
index 93949524afb..00000000000
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.session.util;
-
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-/** Supplier with a throws-clause. */
-@FunctionalInterface
-public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
- /**
- * The same as {@link java.util.function.Supplier#get()} except that this
method is declared with
- * a throws-clause.
- */
- OUTPUT get() throws THROWABLE, StatementExecutionException;
-}