This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 571dcbe19c9 [RTO/RPO] Coordinator/Session Failover Retry (#15269)
571dcbe19c9 is described below
commit 571dcbe19c9ab16a53a5e6b1d4239cf1a9daf32d
Author: William Song <[email protected]>
AuthorDate: Tue Apr 22 14:33:03 2025 +0800
[RTO/RPO] Coordinator/Session Failover Retry (#15269)
* rto/rpo retry
* enable restart it
* add apache license
* add apache license
* add apache license
* re run CI
* add apache license
* add retry in jdbc
* remove reconnect on jdbc
* Debug
* do value copy
* add license
* spotless
* Debug2
* add reconnect
* partial
* Update MeasurementGroup.java
* Update AutoCreateSchemaExecutor.java
* Update MeasurementGroup.java
* Update MeasurementGroup.java
* maybe final
* Debug logger removal
* partial
* Removed useless
* add and retry
---------
Co-authored-by: Caideyipi <[email protected]>
---
.../org/apache/iotdb/db/it/IoTDBRestartIT.java | 2 -
.../org/apache/iotdb/db/it/utils/TestUtils.java | 8 -
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 134 ++++++-----
.../apache/iotdb/session/SessionConnection.java | 148 +++++++++---
.../iotdb/db/queryengine/plan/Coordinator.java | 4 -
.../analyze/schema/AutoCreateSchemaExecutor.java | 14 +-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 9 +-
.../plan/analyze/schema/ClusterSchemaFetcher.java | 1 -
.../queryengine/plan/planner/TreeModelPlanner.java | 2 -
.../plan/node/metadata/write/MeasurementGroup.java | 25 ++-
.../plan/relational/planner/TableModelPlanner.java | 9 -
.../plan/scheduler/AsyncPlanNodeSender.java | 21 +-
.../plan/scheduler/ClusterScheduler.java | 5 -
.../FailedFragmentInstanceWithStatus.java | 41 ++++
.../scheduler/FragmentInstanceDispatcherImpl.java | 248 ++++++++++++---------
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 4 +-
.../exception/QuerySchemaFetchFailedException.java | 30 +++
.../apache/iotdb/commons/utils/StatusUtils.java | 1 -
18 files changed, 461 insertions(+), 245 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
index 603196e871a..14d298e31bd 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestartIT.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -47,7 +46,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
-@Ignore("enable this after RTO/RPO retry")
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
public class IoTDBRestartIT {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index fef5dc2e469..2a64278b1e8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -20,14 +20,12 @@
package org.apache.iotdb.db.it.utils;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
-import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.env.BaseEnv;
-import org.apache.iotdb.itbase.env.BaseNodeWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -45,7 +43,6 @@ import java.sql.Statement;
import java.text.DateFormat;
import java.time.ZoneId;
import java.time.ZoneOffset;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -1709,11 +1706,6 @@ public class TestUtils {
long retryIntervalMS = 1000;
while (true) {
try (Connection connection = EnvFactory.getEnv().getConnection()) {
- final List<BaseNodeWrapper> allDataNodes =
- new ArrayList<>(EnvFactory.getEnv().getDataNodeWrapperList());
- EnvFactory.getEnv()
- .ensureNodeStatus(
- allDataNodes, Collections.nCopies(allDataNodes.size(),
NodeStatus.Running));
break;
} catch (Exception e) {
try {
diff --git
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index c36d905dd90..820f8e19337 100644
--- a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -45,6 +45,8 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import static org.apache.iotdb.jdbc.Constant.TABLE;
import static org.apache.iotdb.jdbc.Constant.TREE;
@@ -274,18 +276,10 @@ public class IoTDBStatement implements Statement {
try {
return executeSQL(sql);
} catch (TException e) {
- if (reConnect()) {
- try {
- return executeSQL(sql);
- } catch (TException e2) {
- throw new SQLException(e2);
- }
- } else {
- throw new SQLException(
- String.format(
- "Fail to reconnect to server when executing %s. please check
server status", sql),
- e);
- }
+ throw new SQLException(
+ String.format(
+ "Fail to reconnect to server when executing %s. please check
server status", sql),
+ e);
}
}
@@ -304,6 +298,54 @@ public class IoTDBStatement implements Statement {
throw new SQLException(NOT_SUPPORT_EXECUTE);
}
+ private interface TFunction<T> {
+ T run() throws TException;
+ }
+
+ private <T> T callWithRetryAndReconnect(TFunction<T> rpc, Function<T,
TSStatus> statusGetter)
+ throws SQLException, TException {
+ TException lastTException = null;
+ T result = null;
+ int retryAttempt;
+ int maxRetryCount = 5;
+ int retryIntervalInMs = 1000;
+ for (retryAttempt = 0; retryAttempt <= maxRetryCount; retryAttempt++) {
+ // 1. try to execute the rpc
+ try {
+ result = rpc.run();
+ lastTException = null;
+ } catch (TException e) {
+ result = null;
+ lastTException = e;
+ }
+
+ TSStatus status = null;
+ if (result != null) {
+ status = statusGetter.apply(result);
+ }
+ // success, return immediately
+ if (status != null && !(status.isSetNeedRetry() &&
status.isNeedRetry())) {
+ return result;
+ }
+
+ // prepare for the next retry
+ if (lastTException != null) {
+ reConnect();
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+
+ if (result == null && lastTException != null) {
+ throw lastTException;
+ }
+ return result;
+ }
+
/**
* There are two kinds of sql here: (1) query sql (2) update sql.
*
@@ -318,7 +360,9 @@ public class IoTDBStatement implements Statement {
}
execReq.setFetchSize(rows);
execReq.setTimeout((long) queryTimeout * 1000);
- TSExecuteStatementResp execResp = client.executeStatementV2(execReq);
+ TSExecuteStatementResp execResp =
+ callWithRetryAndReconnect(
+ () -> client.executeStatementV2(execReq),
TSExecuteStatementResp::getStatus);
try {
RpcUtils.verifySuccess(execResp.getStatus());
} catch (StatementExecutionException e) {
@@ -370,26 +414,18 @@ public class IoTDBStatement implements Statement {
try {
return executeBatchSQL();
} catch (TException e) {
- if (reConnect()) {
- try {
- return executeBatchSQL();
- } catch (TException e2) {
- throw new SQLException(
- "Fail to execute batch sqls after reconnecting. please check
server status", e2);
- }
- } else {
- throw new SQLException(
- "Fail to reconnect to server when executing batch sqls. please
check server status", e);
- }
+ throw new SQLException(
+ "Fail to reconnect to server when executing batch sqls. please check
server status", e);
} finally {
clearBatch();
}
}
- private int[] executeBatchSQL() throws TException, BatchUpdateException {
+ private int[] executeBatchSQL() throws TException, BatchUpdateException,
SQLException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new
TSExecuteBatchStatementReq(sessionId, batchSQLList);
- TSStatus execResp = client.executeBatchStatement(execReq);
+ TSStatus execResp =
+ callWithRetryAndReconnect(() -> client.executeBatchStatement(execReq),
status -> status);
int[] result = new int[batchSQLList.size()];
boolean allSuccess = true;
StringBuilder message = new StringBuilder(System.lineSeparator());
@@ -444,20 +480,9 @@ public class IoTDBStatement implements Statement {
try {
return executeQuerySQL(sql, timeoutInMS);
} catch (TException e) {
- if (reConnect()) {
- try {
- return executeQuerySQL(sql, timeoutInMS);
- } catch (TException e2) {
- throw new SQLException(
- "Fail to executeQuery " + sql + "after reconnecting. please
check server status", e2);
- }
- } else {
- throw new SQLException(
- "Fail to reconnect to server when execute query "
- + sql
- + ". please check server status",
- e);
- }
+ throw new SQLException(
+ "Fail to reconnect to server when execute query " + sql + ". please
check server status",
+ e);
}
}
@@ -471,7 +496,9 @@ public class IoTDBStatement implements Statement {
execReq.setFetchSize(rows);
execReq.setTimeout(timeoutInMS);
execReq.setJdbcQuery(true);
- TSExecuteStatementResp execResp = client.executeQueryStatementV2(execReq);
+ TSExecuteStatementResp execResp =
+ callWithRetryAndReconnect(
+ () -> client.executeQueryStatementV2(execReq),
TSExecuteStatementResp::getStatus);
queryId = execResp.getQueryId();
try {
RpcUtils.verifySuccess(execResp.getStatus());
@@ -520,21 +547,9 @@ public class IoTDBStatement implements Statement {
try {
return executeUpdateSQL(sql);
} catch (TException e) {
- if (reConnect()) {
- try {
- return executeUpdateSQL(sql);
- } catch (TException e2) {
- throw new SQLException(
- "Fail to execute update " + sql + "after reconnecting. please
check server status",
- e2);
- }
- } else {
- throw new SQLException(
- "Fail to reconnect to server when execute update "
- + sql
- + ". please check server status",
- e);
- }
+ throw new SQLException(
+ "Fail to reconnect to server when execute update " + sql + ". please
check server status",
+ e);
}
}
@@ -553,9 +568,12 @@ public class IoTDBStatement implements Statement {
throw new SQLException(NOT_SUPPORT_EXECUTE_UPDATE);
}
- private int executeUpdateSQL(final String sql) throws TException,
IoTDBSQLException {
+ private int executeUpdateSQL(final String sql)
+ throws TException, IoTDBSQLException, SQLException {
final TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId,
sql, stmtId);
- final TSExecuteStatementResp execResp =
client.executeUpdateStatement(execReq);
+ final TSExecuteStatementResp execResp =
+ callWithRetryAndReconnect(
+ () -> client.executeUpdateStatement(execReq),
TSExecuteStatementResp::getStatus);
if (execResp.isSetQueryId()) {
queryId = execResp.getQueryId();
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 9dbbbc343e2..58d46d4b5a3 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
import
org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
@@ -83,6 +84,8 @@ import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.apache.iotdb.session.Session.TABLE;
@@ -288,7 +291,7 @@ public class SessionConnection {
protected void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId,
zoneId);
return client.setTimeZone(req);
@@ -312,21 +315,23 @@ public class SessionConnection {
protected void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(() -> client.setStorageGroup(sessionId,
storageGroup)).getResult();
+ callWithRetryAndReconnect(() -> client.setStorageGroup(sessionId,
storageGroup))
+ .getResult();
RpcUtils.verifySuccess(status);
}
protected void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(() -> client.deleteStorageGroups(sessionId,
storageGroups)).getResult();
+ callWithRetryAndReconnect(() -> client.deleteStorageGroups(sessionId,
storageGroups))
+ .getResult();
RpcUtils.verifySuccess(status);
}
protected void createTimeseries(TSCreateTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.createTimeseries(request);
@@ -338,7 +343,7 @@ public class SessionConnection {
protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.createAlignedTimeseries(request);
@@ -350,7 +355,7 @@ public class SessionConnection {
protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.createMultiTimeseries(request);
@@ -384,12 +389,13 @@ public class SessionConnection {
execReq.setEnableRedirectQuery(enableRedirect);
RetryResult<TSExecuteStatementResp> result =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
execReq.setSessionId(sessionId);
execReq.setStatementId(statementId);
return client.executeQueryStatementV2(execReq);
- });
+ },
+ TSExecuteStatementResp::getStatus);
TSExecuteStatementResp execResp = result.getResult();
if (result.getRetryAttempts() == 0) {
RpcUtils.verifySuccessWithRedirection(execResp.getStatus());
@@ -453,12 +459,13 @@ public class SessionConnection {
execReq.setEnableRedirectQuery(enableRedirect);
RetryResult<TSExecuteStatementResp> result =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
execReq.setSessionId(sessionId);
execReq.setStatementId(statementId);
return client.executeRawDataQueryV2(execReq);
- });
+ },
+ TSExecuteStatementResp::getStatus);
TSExecuteStatementResp execResp = result.getResult();
if (result.getRetryAttempts() == 0) {
@@ -497,12 +504,13 @@ public class SessionConnection {
TEndPoint redirectedEndPoint = null;
RetryResult<TSExecuteStatementResp> result =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
req.setSessionId(sessionId);
req.setStatementId(statementId);
return client.executeFastLastDataQueryForOneDeviceV2(req);
- });
+ },
+ TSExecuteStatementResp::getStatus);
TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
if (result.getRetryAttempts() == 0) {
@@ -544,12 +552,13 @@ public class SessionConnection {
tsLastDataQueryReq.setTimeout(timeOut);
RetryResult<TSExecuteStatementResp> result =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
tsLastDataQueryReq.setSessionId(sessionId);
tsLastDataQueryReq.setStatementId(statementId);
return client.executeLastDataQueryV2(tsLastDataQueryReq);
- });
+ },
+ TSExecuteStatementResp::getStatus);
final TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
if (result.getRetryAttempts() == 0) {
@@ -625,12 +634,13 @@ public class SessionConnection {
private SessionDataSet executeAggregationQuery(TSAggregationQueryReq
tsAggregationQueryReq)
throws StatementExecutionException, IoTDBConnectionException,
RedirectException {
RetryResult<TSExecuteStatementResp> result =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
tsAggregationQueryReq.setSessionId(sessionId);
tsAggregationQueryReq.setStatementId(statementId);
return client.executeAggregationQueryV2(tsAggregationQueryReq);
- });
+ },
+ TSExecuteStatementResp::getStatus);
TSExecuteStatementResp tsExecuteStatementResp = result.getResult();
if (result.getRetryAttempts() == 0) {
@@ -852,6 +862,73 @@ public class SessionConnection {
return new RetryResult<>(status, lastTException, i);
}
+ private RetryResult<TSStatus> callWithRetryAndReconnect(TFunction<TSStatus>
rpc) {
+ return callWithRetryAndReconnect(
+ rpc,
+ status -> status.isSetNeedRetry() && status.isNeedRetry(),
+ status -> status.getCode() ==
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
+ }
+
+ private <T> RetryResult<T> callWithRetryAndReconnect(
+ TFunction<T> rpc, Function<T, TSStatus> statusGetter) {
+ return callWithRetryAndReconnect(
+ rpc,
+ t -> {
+ final TSStatus status = statusGetter.apply(t);
+ return status.isSetNeedRetry() && status.isNeedRetry();
+ },
+ t ->
+ statusGetter.apply(t).getCode()
+ == TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
+ }
+
+ /** reconnect if the remote datanode is unreachable retry if the status is
set to needRetry */
+ private <T> RetryResult<T> callWithRetryAndReconnect(
+ TFunction<T> rpc, Predicate<T> shouldRetry, Predicate<T> forceReconnect)
{
+ TException lastTException = null;
+ T result = null;
+ int retryAttempt;
+ int maxRetryCountRead = 10;
+ for (retryAttempt = 0; retryAttempt <= maxRetryCountRead; retryAttempt++) {
+ // 1. try to execute the rpc
+ try {
+ result = rpc.run();
+ lastTException = null;
+ } catch (TException e) {
+ result = null;
+ lastTException = e;
+ }
+
+ // success, return immediately
+ if (result != null && !shouldRetry.test(result)) {
+ return new RetryResult<>(result, null, retryAttempt);
+ }
+
+ // prepare for the next retry
+ if (lastTException != null
+ || !availableNodes.get().contains(this.endPoint)
+ || (result != null && forceReconnect.test(result))) {
+ // 1. the current datanode is unreachable (TException)
+ // 2. the current datanode is partitioned with other nodes (not in
availableNodes)
+ // 3. asymmetric network partition
+ reconnect();
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(retryIntervalInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn(
+ "Thread {} was interrupted during retry {} with wait time {} ms.
Exiting retry loop.",
+ Thread.currentThread().getName(),
+ retryAttempt,
+ retryIntervalInMs);
+ break;
+ }
+ }
+
+ return new RetryResult<>(result, lastTException, retryAttempt);
+ }
+
private TSStatus deleteDataInternal(TSDeleteDataReq request) throws
TException {
request.setSessionId(sessionId);
return client.deleteData(request);
@@ -860,7 +937,7 @@ public class SessionConnection {
protected void testInsertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertStringRecord(request);
@@ -872,7 +949,7 @@ public class SessionConnection {
protected void testInsertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertRecord(request);
@@ -884,7 +961,7 @@ public class SessionConnection {
public void testInsertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertStringRecords(request);
@@ -896,7 +973,7 @@ public class SessionConnection {
public void testInsertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertRecords(request);
@@ -908,7 +985,7 @@ public class SessionConnection {
protected void testInsertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertTablet(request);
@@ -920,7 +997,7 @@ public class SessionConnection {
protected void testInsertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.testInsertTablets(request);
@@ -980,7 +1057,7 @@ public class SessionConnection {
protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.createSchemaTemplate(request);
@@ -992,7 +1069,7 @@ public class SessionConnection {
protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.appendSchemaTemplate(request);
@@ -1004,7 +1081,7 @@ public class SessionConnection {
protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.pruneSchemaTemplate(request);
@@ -1016,11 +1093,12 @@ public class SessionConnection {
protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
throws StatementExecutionException, IoTDBConnectionException {
final TSQueryTemplateResp execResp =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
req.setSessionId(sessionId);
return client.querySchemaTemplate(req);
- })
+ },
+ TSQueryTemplateResp::getStatus)
.getResult();
RpcUtils.verifySuccess(execResp.getStatus());
return execResp;
@@ -1029,7 +1107,7 @@ public class SessionConnection {
protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.setSchemaTemplate(request);
@@ -1041,7 +1119,7 @@ public class SessionConnection {
protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.unsetSchemaTemplate(request);
@@ -1053,7 +1131,7 @@ public class SessionConnection {
protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.dropSchemaTemplate(request);
@@ -1066,7 +1144,7 @@ public class SessionConnection {
TCreateTimeseriesUsingSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
final TSStatus status =
- callWithReconnect(
+ callWithRetryAndReconnect(
() -> {
request.setSessionId(sessionId);
return client.createTimeseriesUsingSchemaTemplate(request);
@@ -1078,7 +1156,9 @@ public class SessionConnection {
protected TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
final TSBackupConfigurationResp execResp =
- callWithReconnect(() -> client.getBackupConfiguration()).getResult();
+ callWithRetryAndReconnect(
+ () -> client.getBackupConfiguration(),
TSBackupConfigurationResp::getStatus)
+ .getResult();
RpcUtils.verifySuccess(execResp.getStatus());
return execResp;
}
@@ -1104,7 +1184,9 @@ public class SessionConnection {
}
public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
- return callWithReconnect(() ->
client.fetchAllConnectionsInfo()).getResult();
+ return callWithRetryAndReconnect(
+ () -> client.fetchAllConnectionsInfo(), resp -> false, resp ->
false)
+ .getResult();
}
public boolean isEnableRedirect() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 39f07c4c62d..404ed022966 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -375,8 +375,6 @@ public class Coordinator {
statement.toRelationalStatement(queryContext),
sqlParser,
metadata,
- executor,
- writeOperationExecutor,
scheduledExecutor,
SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
@@ -463,8 +461,6 @@ public class Coordinator {
statement,
sqlParser,
metadata,
- executor,
- writeOperationExecutor,
scheduledExecutor,
SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 76f88bb18ca..80b17a2a45c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -590,9 +590,19 @@ class AutoCreateSchemaExecutor {
Map<PartialPath, Pair<Boolean, MeasurementGroup>>
devicesNeedAutoCreateTimeSeries,
MPPQueryContext context) {
- List<MeasurementPath> measurementPathList =
+ // Deep copy to avoid changes to the original map
+ final List<MeasurementPath> measurementPathList =
executeInternalCreateTimeseriesStatement(
- new
InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries),
context);
+ new InternalCreateMultiTimeSeriesStatement(
+ devicesNeedAutoCreateTimeSeries.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
+ new Pair<>(
+ entry.getValue().getLeft(),
+ entry.getValue().getRight().deepCopy())))),
+ context);
schemaTree.appendMeasurementPaths(measurementPathList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 16f5420413e..2153a0ccf4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.queryengine.plan.analyze.schema;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
-import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -251,11 +251,11 @@ class ClusterSchemaFetchExecutor {
try {
ExecutionResult executionResult = executionStatement(queryId,
fetchStatement, context);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new IoTDBRuntimeException(
+ throw new QuerySchemaFetchFailedException(
String.format("Fetch Schema failed, because %s",
executionResult.status.getMessage()),
executionResult.status.getCode());
}
- try (SetThreadName threadName = new
SetThreadName(executionResult.queryId.getId())) {
+ try (SetThreadName ignored = new
SetThreadName(executionResult.queryId.getId())) {
ClusterSchemaTree result = new ClusterSchemaTree();
Set<String> databaseSet = new HashSet<>();
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
@@ -266,7 +266,8 @@ class ClusterSchemaFetchExecutor {
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
} catch (IoTDBException e) {
t = e;
- throw new RuntimeException("Fetch Schema failed. ", e);
+ throw new QuerySchemaFetchFailedException(
+ String.format("Fetch Schema failed: %s", e.getMessage()),
e.getErrorCode());
}
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index 3008e03237e..536a06bb09c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -53,7 +53,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ClusterSchemaFetcher implements ISchemaFetcher {
-
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Coordinator coordinator = Coordinator.getInstance();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 88bdeb87a38..eaaacad64ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -148,8 +148,6 @@ public class TreeModelPlanner implements IPlanner {
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
- executor,
- writeOperationExecutor,
scheduledExecutor,
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
index bce60638e1e..8667f6e21e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metadata/write/MeasurementGroup.java
@@ -45,7 +45,7 @@ public class MeasurementGroup {
private List<Map<String, String>> tagsList;
private List<Map<String, String>> attributesList;
- private final transient Set<String> measurementSet = new HashSet<>();
+ private transient Set<String> measurementSet = new HashSet<>();
public List<String> getMeasurements() {
return measurements;
@@ -394,6 +394,29 @@ public class MeasurementGroup {
}
}
+ // This won't be affected by "removeMeasurements"
+ public MeasurementGroup deepCopy() {
+ final MeasurementGroup result = new MeasurementGroup();
+ result.measurements =
+ Objects.nonNull(this.measurements) ? new
ArrayList<>(this.measurements) : null;
+ result.dataTypes = Objects.nonNull(this.dataTypes) ? new
ArrayList<>(this.dataTypes) : null;
+ result.encodings = Objects.nonNull(this.encodings) ? new
ArrayList<>(this.encodings) : null;
+ result.compressors =
+ Objects.nonNull(this.compressors) ? new ArrayList<>(this.compressors)
: null;
+ result.aliasList = Objects.nonNull(this.aliasList) ? new
ArrayList<>(this.aliasList) : null;
+ result.propsList = Objects.nonNull(this.propsList) ? new
ArrayList<>(this.propsList) : null;
+ result.tagsList = Objects.nonNull(this.tagsList) ? new
ArrayList<>(this.tagsList) : null;
+ result.attributesList =
+ Objects.nonNull(this.attributesList) ? new
ArrayList<>(this.attributesList) : null;
+ result.measurementSet = new HashSet<>(measurementSet);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.nonNull(measurements) ? measurements.toString() : "null";
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index f8d39ddccf0..a5765ee538c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -58,7 +58,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.DISTRIBUTION_PLANNER;
@@ -78,8 +77,6 @@ public class TableModelPlanner implements IPlanner {
private final WarningCollector warningCollector = WarningCollector.NOOP;
- private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
@@ -94,8 +91,6 @@ public class TableModelPlanner implements IPlanner {
final Statement statement,
final SqlParser sqlParser,
final Metadata metadata,
- final ExecutorService executor,
- final ExecutorService writeOperationExecutor,
final ScheduledExecutorService scheduledExecutor,
final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
@@ -109,8 +104,6 @@ public class TableModelPlanner implements IPlanner {
this.statement = statement;
this.sqlParser = sqlParser;
this.metadata = metadata;
- this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
this.scheduledExecutor = scheduledExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
@@ -197,8 +190,6 @@ public class TableModelPlanner implements IPlanner {
stateMachine,
distributedPlan.getInstances(),
context.getQueryType(),
- executor,
- writeOperationExecutor,
scheduledExecutor,
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
index 230eb27941e..fc1e3d049d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/AsyncPlanNodeSender.java
@@ -109,19 +109,24 @@ public class AsyncPlanNodeSender {
}
}
- public List<TSStatus> getFailureStatusList() {
- List<TSStatus> failureStatusList = new ArrayList<>();
+ public List<FailedFragmentInstanceWithStatus>
getFailedInstancesWithStatuses() {
+ List<FailedFragmentInstanceWithStatus>
failureFragmentInstanceWithStatusList =
+ new ArrayList<>();
TSStatus status;
for (Map.Entry<Integer, TSendSinglePlanNodeResp> entry :
instanceId2RespMap.entrySet()) {
status = entry.getValue().getStatus();
+ final FragmentInstance instance = instances.get(entry.getKey());
if (!entry.getValue().accepted) {
if (status == null) {
LOGGER.warn(
"dispatch write failed. message: {}, node {}",
entry.getValue().message,
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
- failureStatusList.add(
- RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_ERROR,
entry.getValue().getMessage()));
+ failureFragmentInstanceWithStatusList.add(
+ new FailedFragmentInstanceWithStatus(
+ instance,
+ RpcUtils.getStatus(
+ TSStatusCode.WRITE_PROCESS_ERROR,
entry.getValue().getMessage())));
} else {
LOGGER.warn(
"dispatch write failed. status: {}, code: {}, message: {}, node
{}",
@@ -129,16 +134,18 @@ public class AsyncPlanNodeSender {
TSStatusCode.representOf(status.code),
entry.getValue().message,
instances.get(entry.getKey()).getHostDataNode().getInternalEndPoint());
- failureStatusList.add(status);
+ failureFragmentInstanceWithStatusList.add(
+ new FailedFragmentInstanceWithStatus(instance, status));
}
} else {
// some expected and accepted status except SUCCESS_STATUS need to be
returned
if (status != null && status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- failureStatusList.add(status);
+ failureFragmentInstanceWithStatusList.add(
+ new FailedFragmentInstanceWithStatus(instance, status));
}
}
}
- return failureStatusList;
+ return failureFragmentInstanceWithStatusList;
}
public boolean needRetry() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index b3e9acdc00d..b74dba62c15 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -71,8 +70,6 @@ public class ClusterScheduler implements IScheduler {
QueryStateMachine stateMachine,
List<FragmentInstance> instances,
QueryType queryType,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
ScheduledExecutorService scheduledExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -84,8 +81,6 @@ public class ClusterScheduler implements IScheduler {
new FragmentInstanceDispatcherImpl(
queryType,
queryContext,
- executor,
- writeOperationExecutor,
syncInternalServiceClientManager,
asyncInternalServiceClientManager);
if (queryType == QueryType.READ) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
new file mode 100644
index 00000000000..8da3bfd2d9e
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FailedFragmentInstanceWithStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.scheduler;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+
+public class FailedFragmentInstanceWithStatus {
+ private final FragmentInstance instance;
+ private final TSStatus failureStatus;
+
+ public FailedFragmentInstanceWithStatus(FragmentInstance instance, TSStatus
failureStatus) {
+ this.instance = instance;
+ this.failureStatus = failureStatus;
+ }
+
+ public FragmentInstance getInstance() {
+ return instance;
+ }
+
+ public TSStatus getFailureStatus() {
+ return failureStatus;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 5aef2e3176c..111a40470ad 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
@@ -56,6 +57,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,9 +65,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ExecutorService;
+import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static
org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet.DISPATCH_READ;
@@ -76,9 +79,6 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
LoggerFactory.getLogger(FragmentInstanceDispatcherImpl.class);
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
-
- private final ExecutorService executor;
- private final ExecutorService writeOperationExecutor;
private final QueryType type;
private final MPPQueryContext queryContext;
private final String localhostIpAddr;
@@ -97,22 +97,24 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
private static final String UNEXPECTED_ERRORS = "Unexpected errors: ";
+ private final long maxRetryDurationInNs;
+
public FragmentInstanceDispatcherImpl(
QueryType type,
MPPQueryContext queryContext,
- ExecutorService executor,
- ExecutorService writeOperationExecutor,
IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
syncInternalServiceClientManager,
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager) {
this.type = type;
this.queryContext = queryContext;
- this.executor = executor;
- this.writeOperationExecutor = writeOperationExecutor;
this.syncInternalServiceClientManager = syncInternalServiceClientManager;
this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
this.localhostIpAddr =
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
this.localhostInternalPort =
IoTDBDescriptor.getInstance().getConfig().getInternalPort();
+ this.maxRetryDurationInNs =
+ COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
+ ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
+ : 0;
}
@Override
@@ -120,7 +122,7 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
if (type == QueryType.READ) {
return dispatchRead(instances);
} else {
- return dispatchWriteAsync(instances);
+ return dispatchWrite(instances);
}
}
@@ -163,138 +165,170 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
}
- private Future<FragInstanceDispatchResult>
dispatchWriteSync(List<FragmentInstance> instances) {
- List<TSStatus> failureStatusList = new ArrayList<>();
- for (FragmentInstance instance : instances) {
- try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
- dispatchOneInstance(instance);
- } catch (FragmentInstanceDispatchException e) {
- TSStatus failureStatus = e.getFailureStatus();
- if (instances.size() == 1) {
- failureStatusList.add(failureStatus);
- } else {
- if (failureStatus.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- failureStatusList.addAll(failureStatus.getSubStatus());
- } else {
- failureStatusList.add(failureStatus);
- }
- }
- } catch (Throwable t) {
- LOGGER.warn(DISPATCH_FAILED, t);
- failureStatusList.add(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage()));
+ /** Entrypoint for dispatching write fragment instances. */
+ private Future<FragInstanceDispatchResult>
dispatchWrite(List<FragmentInstance> instances) {
+ final List<TSStatus> dispatchFailures = new ArrayList<>();
+ int replicaNum = 0;
+
+ // 1. do not dispatch if the RegionReplicaSet is empty
+ final List<FragmentInstance> shouldDispatch = new ArrayList<>();
+ for (final FragmentInstance instance : instances) {
+ if (instance.getHostDataNode() == null
+ || Optional.ofNullable(instance.getRegionReplicaSet())
+ .map(TRegionReplicaSet::getDataNodeLocationsSize)
+ .orElse(0)
+ == 0) {
+ dispatchFailures.add(
+ new
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
+ } else {
+ replicaNum =
+ Math.max(replicaNum,
instance.getRegionReplicaSet().getDataNodeLocationsSize());
+ shouldDispatch.add(instance);
}
}
- if (failureStatusList.isEmpty()) {
+
+ try {
+ // 2. try the dispatch
+ final List<FailedFragmentInstanceWithStatus> failedInstances =
+ dispatchWriteOnce(shouldDispatch);
+
+ // 3. decide if we need retry (we may decide the retry condition
instance-wise, if needed)
+ final boolean shouldRetry =
+ !failedInstances.isEmpty() && maxRetryDurationInNs > 0 && replicaNum
> 1;
+ if (!shouldRetry) {
+ failedInstances.forEach(fi ->
dispatchFailures.add(fi.getFailureStatus()));
+ } else {
+ // 4. retry the instance on other replicas
+ final List<FragmentInstance> retryInstances =
+ failedInstances.stream()
+ .map(FailedFragmentInstanceWithStatus::getInstance)
+ .collect(Collectors.toList());
+ // here we only retry over each replica once
+ final List<FailedFragmentInstanceWithStatus> failedAfterRetry =
+ dispatchRetryWrite(retryInstances, replicaNum);
+ failedAfterRetry.forEach(fi ->
dispatchFailures.add(fi.getFailureStatus()));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Interrupted when dispatching write async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ }
+
+ if (dispatchFailures.isEmpty()) {
return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+ if (instances.size() == 1) {
+ return immediateFuture(new
FragInstanceDispatchResult(dispatchFailures.get(0)));
} else {
- if (instances.size() == 1) {
- return immediateFuture(new
FragInstanceDispatchResult(failureStatusList.get(0)));
- } else {
- return immediateFuture(
- new
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+ List<TSStatus> failureStatusList = new ArrayList<>();
+ for (TSStatus dataNodeFailure : dispatchFailures) {
+ if (dataNodeFailure.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+ failureStatusList.addAll(dataNodeFailure.getSubStatus());
+ } else {
+ failureStatusList.add(dataNodeFailure);
+ }
}
+ return immediateFuture(new
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
}
}
- private Future<FragInstanceDispatchResult>
dispatchWriteAsync(List<FragmentInstance> instances) {
- List<TSStatus> dataNodeFailureList = new ArrayList<>();
- // split local and remote instances
- List<FragmentInstance> localInstances = new ArrayList<>();
- List<FragmentInstance> remoteInstances = new ArrayList<>();
+ /**
+ * Dispatch the given write instances once. It will dispatch the given
instances locally or
+ * remotely, give the host datanode.
+ */
+ private List<FailedFragmentInstanceWithStatus>
dispatchWriteOnce(List<FragmentInstance> instances)
+ throws InterruptedException {
+ if (instances.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final List<FragmentInstance> localInstances = new ArrayList<>();
+ final List<FragmentInstance> remoteInstances = new ArrayList<>();
for (FragmentInstance instance : instances) {
- if (instance.getHostDataNode() == null) {
- dataNodeFailureList.add(
- new
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
- continue;
- }
- TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
- if (isDispatchedToLocal(endPoint)) {
+ if
(isDispatchedToLocal(instance.getHostDataNode().getInternalEndPoint())) {
localInstances.add(instance);
} else {
remoteInstances.add(instance);
}
}
- // async dispatch to remote
- AsyncPlanNodeSender asyncPlanNodeSender =
+
+ final List<FailedFragmentInstanceWithStatus>
failedFragmentInstanceWithStatuses =
+ new ArrayList<>();
+
+ // 1. async dispatch to remote
+ final AsyncPlanNodeSender asyncPlanNodeSender =
new AsyncPlanNodeSender(asyncInternalServiceClientManager,
remoteInstances);
asyncPlanNodeSender.sendAll();
+ // 2. sync dispatch to local
if (!localInstances.isEmpty()) {
- // sync dispatch to local
long localScheduleStartTime = System.nanoTime();
for (FragmentInstance localInstance : localInstances) {
- try (SetThreadName threadName = new
SetThreadName(localInstance.getId().getFullId())) {
+ try (SetThreadName ignored = new
SetThreadName(localInstance.getId().getFullId())) {
dispatchLocally(localInstance);
} catch (FragmentInstanceDispatchException e) {
- dataNodeFailureList.add(e.getFailureStatus());
+ failedFragmentInstanceWithStatuses.add(
+ new FailedFragmentInstanceWithStatus(localInstance,
e.getFailureStatus()));
} catch (Throwable t) {
LOGGER.warn(DISPATCH_FAILED, t);
- dataNodeFailureList.add(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage()));
+ failedFragmentInstanceWithStatuses.add(
+ new FailedFragmentInstanceWithStatus(
+ localInstance,
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
}
}
PERFORMANCE_OVERVIEW_METRICS.recordScheduleLocalCost(
System.nanoTime() - localScheduleStartTime);
}
- // wait until remote dispatch done
- try {
- asyncPlanNodeSender.waitUntilCompleted();
- final long maxRetryDurationInNs =
- COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() > 0
- ? COMMON_CONFIG.getRemoteWriteMaxRetryDurationInMs() * 1_000_000L
- : 0;
- if (maxRetryDurationInNs > 0 && asyncPlanNodeSender.needRetry()) {
- // retry failed remote FIs
- int retryCount = 0;
- long waitMillis = getRetrySleepTime(retryCount);
- long retryStartTime = System.nanoTime();
-
- while (asyncPlanNodeSender.needRetry()) {
- retryCount++;
- asyncPlanNodeSender.retry();
- // if !(still need retry and current time + next sleep time <
maxRetryDurationInNs)
- if (!(asyncPlanNodeSender.needRetry()
- && (System.nanoTime() - retryStartTime + waitMillis * 1_000_000L)
- < maxRetryDurationInNs)) {
- break;
- }
- // still need to retry, sleep some time before make another retry.
- Thread.sleep(waitMillis);
- PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis *
1_000_000L);
- waitMillis = getRetrySleepTime(retryCount);
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.error("Interrupted when dispatching write async", e);
- return immediateFuture(
- new FragInstanceDispatchResult(
- RpcUtils.getStatus(
- TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
- }
+ // 3. wait for remote dispatch results
+ asyncPlanNodeSender.waitUntilCompleted();
- dataNodeFailureList.addAll(asyncPlanNodeSender.getFailureStatusList());
+ // 4. collect remote dispatch results
+
failedFragmentInstanceWithStatuses.addAll(asyncPlanNodeSender.getFailedInstancesWithStatuses());
- if (dataNodeFailureList.isEmpty()) {
- return immediateFuture(new FragInstanceDispatchResult(true));
- }
- if (instances.size() == 1) {
- return immediateFuture(new
FragInstanceDispatchResult(dataNodeFailureList.get(0)));
- } else {
- List<TSStatus> failureStatusList = new ArrayList<>();
- for (TSStatus dataNodeFailure : dataNodeFailureList) {
- if (dataNodeFailure.getCode() ==
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
- failureStatusList.addAll(dataNodeFailure.getSubStatus());
- } else {
- failureStatusList.add(dataNodeFailure);
- }
+ return failedFragmentInstanceWithStatuses;
+ }
+
+ private List<FailedFragmentInstanceWithStatus> dispatchRetryWrite(
+ List<FragmentInstance> retriedInstances, int maxRetryAttempts) throws
InterruptedException {
+ Preconditions.checkArgument(maxRetryAttempts > 0);
+
+ final long retryStartTime = System.nanoTime();
+ int retryAttempt = 0;
+ List<FragmentInstance> nextDispatch = new ArrayList<>(retriedInstances);
+ List<FailedFragmentInstanceWithStatus> failedFragmentInstanceWithStatuses =
+ Collections.emptyList();
+
+ while (retryAttempt < maxRetryAttempts) {
+ // 1. let's retry on next replica location
+ nextDispatch.forEach(FragmentInstance::getNextRetriedHostDataNode);
+
+ // 2. dispatch the instances
+ failedFragmentInstanceWithStatuses = dispatchWriteOnce(nextDispatch);
+
+ // 3. decide if to continue the retry
+ final long waitMillis = getRetrySleepTime(retryAttempt);
+ if (failedFragmentInstanceWithStatuses.isEmpty()
+ || waitMillis + System.nanoTime() >= retryStartTime +
maxRetryDurationInNs) {
+ break;
}
- return immediateFuture(new
FragInstanceDispatchResult(RpcUtils.getStatus(failureStatusList)));
+
+ // 4. sleep and do the next retry
+ Thread.sleep(waitMillis);
+ PERFORMANCE_OVERVIEW_METRICS.recordRemoteRetrySleepCost(waitMillis *
1_000_000L);
+ retryAttempt++;
+ nextDispatch =
+ failedFragmentInstanceWithStatuses.stream()
+ .map(FailedFragmentInstanceWithStatus::getInstance)
+ .collect(Collectors.toList());
}
+
+ return failedFragmentInstanceWithStatuses;
}
private long getRetrySleepTime(int retryTimes) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 69e48494f7a..00df318bdf2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.exception.QuerySchemaFetchFailedException;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
import org.apache.iotdb.db.exception.StorageGroupNotReadyException;
@@ -156,7 +157,8 @@ public class ErrorHandlingUtils {
return RpcUtils.getStatus(
TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR +
rootCause.getMessage());
} else if (t instanceof RootFIPlacementException
- || t instanceof ReplicaSetUnreachableException) {
+ || t instanceof ReplicaSetUnreachableException
+ || t instanceof QuerySchemaFetchFailedException) {
return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION,
rootCause.getMessage());
} else if (t instanceof IoTDBException) {
return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(),
rootCause.getMessage());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
new file mode 100644
index 00000000000..096042a2bf2
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QuerySchemaFetchFailedException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.exception;
+
+/**
+ * Failed to fetch schema via Coordinator during query execution. This is
likely caused by network
+ * partition.
+ */
+public class QuerySchemaFetchFailedException extends IoTDBRuntimeException {
+ public QuerySchemaFetchFailedException(String message, int errorCode) {
+ super(message, errorCode);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index 4ffa36604d6..70d3d3acb2c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -58,7 +58,6 @@ public class StatusUtils {
NEED_RETRY.add(TSStatusCode.WAL_ERROR.getStatusCode());
NEED_RETRY.add(TSStatusCode.DISK_SPACE_INSUFFICIENT.getStatusCode());
NEED_RETRY.add(TSStatusCode.QUERY_PROCESS_ERROR.getStatusCode());
- NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_TIME_OUT.getStatusCode());
NEED_RETRY.add(TSStatusCode.INTERNAL_REQUEST_RETRY_ERROR.getStatusCode());
NEED_RETRY.add(TSStatusCode.CREATE_REGION_ERROR.getStatusCode());
NEED_RETRY.add(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());