This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0626d34f252 [To dev/1.3] Throw CANNOT_FETCH_FI_STATE(722) instead of
301/305 while DN restarting
0626d34f252 is described below
commit 0626d34f25274b25720f3d7052c0b7c696b1fc0c
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Jun 25 10:02:33 2025 +0800
[To dev/1.3] Throw CANNOT_FETCH_FI_STATE(722) instead of 301/305 while DN
restarting
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 11 +++
.../queryengine/execution/QueryStateMachine.java | 30 +++----
.../execution/fragment/FragmentInstanceInfo.java | 4 +
.../execution/fragment/FragmentInstanceState.java | 2 +-
.../queryengine/plan/execution/QueryExecution.java | 8 +-
.../scheduler/FixedRateFragInsStateTracker.java | 99 +++++++++++++++-------
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 11 ++-
7 files changed, 112 insertions(+), 53 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7cde275ce02..28569ebd7f2 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -118,6 +118,17 @@ public enum TSStatusCode {
QUERY_WAS_KILLED(715),
EXPLAIN_ANALYZE_FETCH_ERROR(716),
TOO_MANY_CONCURRENT_QUERIES_ERROR(717),
+ OPERATOR_NOT_FOUND(718),
+
+ QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
+ QUERY_TIMEOUT(720),
+ PLAN_FAILED_NETWORK_PARTITION(721),
+ CANNOT_FETCH_FI_STATE(722),
+
+ // Arithmetic
+ NUMERIC_VALUE_OUT_OF_RANGE(750),
+ DIVISION_BY_ZERO(751),
+ DATE_OUT_OF_RANGE(752),
// Authentication
INIT_AUTH_ERROR(800),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index ab9201787a6..581b015eeb6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
@@ -48,19 +47,13 @@ import static
org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
public class QueryStateMachine {
private final StateMachine<QueryState> queryState;
- // The executor will be used in all the state machines belonged to this
query.
- private Executor stateMachineExecutor;
private Throwable failureException;
private TSStatus failureStatus;
public QueryStateMachine(QueryId queryId, ExecutorService executor) {
- this.stateMachineExecutor = executor;
this.queryState =
new StateMachine<>(
- queryId.toString(),
- this.stateMachineExecutor,
- QUEUED,
- QueryState.TERMINAL_INSTANCE_STATES);
+ queryId.toString(), executor, QUEUED,
QueryState.TERMINAL_INSTANCE_STATES);
}
public void addStateChangeListener(
@@ -109,9 +102,10 @@ public class QueryStateMachine {
}
public void transitionToCanceled(Throwable throwable, TSStatus
failureStatus) {
- this.failureException = throwable;
- this.failureStatus = failureStatus;
- transitionToDoneState(CANCELED);
+ if (transitionToDoneState(CANCELED)) {
+ this.failureException = throwable;
+ this.failureStatus = failureStatus;
+ }
}
public void transitionToAborted() {
@@ -123,20 +117,22 @@ public class QueryStateMachine {
}
public void transitionToFailed(Throwable throwable) {
- this.failureException = throwable;
- transitionToDoneState(FAILED);
+ if (transitionToDoneState(FAILED)) {
+ this.failureException = throwable;
+ }
}
public void transitionToFailed(TSStatus failureStatus) {
- this.failureStatus = failureStatus;
- transitionToDoneState(FAILED);
+ if (transitionToDoneState(FAILED)) {
+ this.failureStatus = failureStatus;
+ }
}
- private void transitionToDoneState(QueryState doneState) {
+ private boolean transitionToDoneState(QueryState doneState) {
requireNonNull(doneState, "doneState is null");
checkArgument(doneState.isDone(), "doneState %s is not a done state",
doneState);
- queryState.setIf(doneState, currentState -> !currentState.isDone());
+ return queryState.setIf(doneState, currentState -> !currentState.isDone());
}
public String getFailureMessage() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
index 4717a23f279..a544aebe6df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
@@ -77,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet {
return message;
}
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
public Optional<TSStatus> getErrorCode() {
return Optional.ofNullable(errorCode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
index 092b1be3816..2bcb12544d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
@@ -47,7 +47,7 @@ public enum FragmentInstanceState {
/** Instance execution failed. */
FAILED(true, true),
/** Instance is not found. */
- NO_SUCH_INSTANCE(false, true);
+ NO_SUCH_INSTANCE(true, true);
public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 29dbd7e3492..28f9753e87a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -605,8 +605,12 @@ public class QueryExecution implements IQueryExecution {
// If RETRYING is triggered by this QueryExecution, the
stateMachine.getFailureStatus() is also
// not null. We should only return the failure status when QueryExecution
is in Done state.
- if (state.isDone() && stateMachine.getFailureStatus() != null) {
- tsstatus = stateMachine.getFailureStatus();
+ if (state.isDone()) {
+ if (analysis.getFailStatus() != null) {
+ tsstatus = analysis.getFailStatus();
+ } else if (stateMachine.getFailureStatus() != null) {
+ tsstatus = stateMachine.getFailureStatus();
+ }
}
// collect redirect info to client for writing
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index e18c628e421..b848db6b6d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -45,13 +46,15 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState.NO_SUCH_INSTANCE;
+
public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
private static final Logger logger =
LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
private static final long SAME_STATE_PRINT_RATE_IN_MS = 10L * 60 * 1000;
- // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
+ // consider how much Interval is OK for state tracker
private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
private ScheduledFuture<?> trackTask;
private final Map<FragmentInstanceId, InstanceStateMetrics> instanceStateMap;
@@ -112,8 +115,8 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
aborted = true;
if (trackTask != null) {
boolean cancelResult = trackTask.cancel(true);
- // TODO: (xingtanzjr) a strange case here is that sometimes
- // the cancelResult is false but the trackTask is definitely cancelled
+ // a strange case here is that sometimes the cancelResult is false but
the trackTask is
+ // definitely cancelled
if (!cancelResult) {
logger.debug("cancel state tracking task failed. {}",
trackTask.isCancelled());
}
@@ -144,8 +147,24 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
updateQueryState(instance.getId(), instanceInfo);
}
} catch (ClientManagerException | TException e) {
- // TODO: do nothing ?
- logger.warn("error happened while fetching query state", e);
+ // network exception, should retry
+ InstanceStateMetrics metrics =
+ instanceStateMap.computeIfAbsent(
+ instance.getId(), k -> new
InstanceStateMetrics(instance.isRoot()));
+ if (metrics.reachMaxRetryCount()) {
+ // if reach max retry count, we think that the DN is down, and FI
in that node won't
+ // exist
+ FragmentInstanceInfo instanceInfo = new
FragmentInstanceInfo(NO_SUCH_INSTANCE);
+ instanceInfo.setMessage(
+ String.format(
+ "Failed to fetch state, has retried %s times",
+ InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT));
+ updateQueryState(instance.getId(), instanceInfo);
+ } else {
+ // if not reaching max retry count, add retry count, and wait for
next fetching schedule
+ metrics.addRetryCount();
+ logger.warn("error happened while fetching query state", e);
+ }
}
}
}
@@ -153,44 +172,46 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
private void updateQueryState(FragmentInstanceId instanceId,
FragmentInstanceInfo instanceInfo) {
// no such instance may be caused by DN restarting
- if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
+ if (instanceInfo.getState() == NO_SUCH_INSTANCE) {
stateMachine.transitionToFailed(
- new RuntimeException(
+ new IoTDBException(
String.format(
"FragmentInstance[%s] is failed. %s, may be caused by DN
restarting.",
- instanceId, instanceInfo.getMessage())));
- }
- if (instanceInfo.getState().isFailed()) {
- if (instanceInfo.getFailureInfoList() == null
+ instanceId, instanceInfo.getMessage()),
+ TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode(),
+ true));
+ } else if (instanceInfo.getState().isFailed()) {
+ if (instanceInfo.getErrorCode().isPresent()) {
+ stateMachine.transitionToFailed(
+ new IoTDBException(
+ instanceInfo.getErrorCode().get().getMessage(),
+ instanceInfo.getErrorCode().get().getCode()));
+ } else if (instanceInfo.getFailureInfoList() == null
|| instanceInfo.getFailureInfoList().isEmpty()) {
stateMachine.transitionToFailed(
new RuntimeException(
String.format(
"FragmentInstance[%s] is failed. %s", instanceId,
instanceInfo.getMessage())));
- } else if (instanceInfo.getErrorCode().isPresent()) {
- stateMachine.transitionToFailed(
- new IoTDBException(
- instanceInfo.getErrorCode().get().getMessage(),
- instanceInfo.getErrorCode().get().getCode()));
} else {
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
}
- }
- boolean queryFinished = false;
- List<InstanceStateMetrics> rootInstanceStateMetricsList =
- instanceStateMap.values().stream()
- .filter(instanceStateMetrics ->
instanceStateMetrics.isRootInstance)
- .collect(Collectors.toList());
- if (!rootInstanceStateMetricsList.isEmpty()) {
- queryFinished =
- rootInstanceStateMetricsList.stream()
- .allMatch(
- instanceStateMetrics ->
- instanceStateMetrics.lastState ==
FragmentInstanceState.FINISHED);
- }
+ } else {
+ boolean queryFinished = false;
+ List<InstanceStateMetrics> rootInstanceStateMetricsList =
+ instanceStateMap.values().stream()
+ .filter(instanceStateMetrics ->
instanceStateMetrics.isRootInstance)
+ .collect(Collectors.toList());
+ if (!rootInstanceStateMetricsList.isEmpty()) {
+ queryFinished =
+ rootInstanceStateMetricsList.stream()
+ .allMatch(
+ instanceStateMetrics ->
+ instanceStateMetrics.lastState ==
FragmentInstanceState.FINISHED);
+ }
- if (queryFinished) {
- stateMachine.transitionToFinished();
+ if (queryFinished) {
+ stateMachine.transitionToFinished();
+ }
}
}
@@ -203,23 +224,39 @@ public class FixedRateFragInsStateTracker extends
AbstractFragInsStateTracker {
}
private static class InstanceStateMetrics {
+ private static final long MAX_STATE_FETCH_RETRY_COUNT = 5;
private final boolean isRootInstance;
private FragmentInstanceState lastState;
private long durationToLastPrintInMS;
+ // we only record the continuous retry count
+ private int retryCount;
private InstanceStateMetrics(boolean isRootInstance) {
this.isRootInstance = isRootInstance;
this.lastState = null;
this.durationToLastPrintInMS = 0L;
+ this.retryCount = 0;
}
private void reset(FragmentInstanceState newState) {
this.lastState = newState;
this.durationToLastPrintInMS = 0L;
+ // each successful fetch, we need to reset the retry count
+ this.retryCount = 0;
+ }
+
+ private void addRetryCount() {
+ this.retryCount++;
+ }
+
+ private boolean reachMaxRetryCount() {
+ return retryCount >= MAX_STATE_FETCH_RETRY_COUNT;
}
private void addDuration(long duration) {
durationToLastPrintInMS += duration;
+ // each successful fetch, we need to reset the retry count
+ this.retryCount = 0;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 679288e7840..9fd7053ba92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -104,8 +104,15 @@ public class ErrorHandlingUtils {
if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()
|| status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()
- || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) {
- LOGGER.warn(message);
+ || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()
+ || status.getCode() ==
TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode()
+ || status.getCode() ==
TSStatusCode.DIVISION_BY_ZERO.getStatusCode()
+ || status.getCode() ==
TSStatusCode.DATE_OUT_OF_RANGE.getStatusCode()
+ || status.getCode() ==
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
+ || status.getCode() ==
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()
+ || status.getCode() ==
TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()
+ || status.getCode() ==
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) {
+ LOGGER.info(message);
} else {
LOGGER.warn(message, e);
}