This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 0626d34f252 [To dev/1.3] Throw CANNOT_FETCH_FI_STATE(722) instead of 
301/305 while DN restarting
0626d34f252 is described below

commit 0626d34f25274b25720f3d7052c0b7c696b1fc0c
Author: Jackie Tien <[email protected]>
AuthorDate: Wed Jun 25 10:02:33 2025 +0800

    [To dev/1.3] Throw CANNOT_FETCH_FI_STATE(722) instead of 301/305 while DN 
restarting
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    | 11 +++
 .../queryengine/execution/QueryStateMachine.java   | 30 +++----
 .../execution/fragment/FragmentInstanceInfo.java   |  4 +
 .../execution/fragment/FragmentInstanceState.java  |  2 +-
 .../queryengine/plan/execution/QueryExecution.java |  8 +-
 .../scheduler/FixedRateFragInsStateTracker.java    | 99 +++++++++++++++-------
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  | 11 ++-
 7 files changed, 112 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 7cde275ce02..28569ebd7f2 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -118,6 +118,17 @@ public enum TSStatusCode {
   QUERY_WAS_KILLED(715),
   EXPLAIN_ANALYZE_FETCH_ERROR(716),
   TOO_MANY_CONCURRENT_QUERIES_ERROR(717),
+  OPERATOR_NOT_FOUND(718),
+
+  QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
+  QUERY_TIMEOUT(720),
+  PLAN_FAILED_NETWORK_PARTITION(721),
+  CANNOT_FETCH_FI_STATE(722),
+
+  // Arithmetic
+  NUMERIC_VALUE_OUT_OF_RANGE(750),
+  DIVISION_BY_ZERO(751),
+  DATE_OUT_OF_RANGE(752),
 
   // Authentication
   INIT_AUTH_ERROR(800),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index ab9201787a6..581b015eeb6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -26,7 +26,6 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.QueryExecution;
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -48,19 +47,13 @@ import static 
org.apache.iotdb.db.queryengine.execution.QueryState.RUNNING;
 public class QueryStateMachine {
   private final StateMachine<QueryState> queryState;
 
-  // The executor will be used in all the state machines belonged to this 
query.
-  private Executor stateMachineExecutor;
   private Throwable failureException;
   private TSStatus failureStatus;
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
-    this.stateMachineExecutor = executor;
     this.queryState =
         new StateMachine<>(
-            queryId.toString(),
-            this.stateMachineExecutor,
-            QUEUED,
-            QueryState.TERMINAL_INSTANCE_STATES);
+            queryId.toString(), executor, QUEUED, 
QueryState.TERMINAL_INSTANCE_STATES);
   }
 
   public void addStateChangeListener(
@@ -109,9 +102,10 @@ public class QueryStateMachine {
   }
 
   public void transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
-    this.failureException = throwable;
-    this.failureStatus = failureStatus;
-    transitionToDoneState(CANCELED);
+    if (transitionToDoneState(CANCELED)) {
+      this.failureException = throwable;
+      this.failureStatus = failureStatus;
+    }
   }
 
   public void transitionToAborted() {
@@ -123,20 +117,22 @@ public class QueryStateMachine {
   }
 
   public void transitionToFailed(Throwable throwable) {
-    this.failureException = throwable;
-    transitionToDoneState(FAILED);
+    if (transitionToDoneState(FAILED)) {
+      this.failureException = throwable;
+    }
   }
 
   public void transitionToFailed(TSStatus failureStatus) {
-    this.failureStatus = failureStatus;
-    transitionToDoneState(FAILED);
+    if (transitionToDoneState(FAILED)) {
+      this.failureStatus = failureStatus;
+    }
   }
 
-  private void transitionToDoneState(QueryState doneState) {
+  private boolean transitionToDoneState(QueryState doneState) {
     requireNonNull(doneState, "doneState is null");
     checkArgument(doneState.isDone(), "doneState %s is not a done state", 
doneState);
 
-    queryState.setIf(doneState, currentState -> !currentState.isDone());
+    return queryState.setIf(doneState, currentState -> !currentState.isDone());
   }
 
   public String getFailureMessage() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
index 4717a23f279..a544aebe6df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceInfo.java
@@ -77,6 +77,10 @@ public class FragmentInstanceInfo implements DataSet {
     return message;
   }
 
+  public void setMessage(String message) {
+    this.message = message;
+  }
+
   public Optional<TSStatus> getErrorCode() {
     return Optional.ofNullable(errorCode);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
index 092b1be3816..2bcb12544d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java
@@ -47,7 +47,7 @@ public enum FragmentInstanceState {
   /** Instance execution failed. */
   FAILED(true, true),
   /** Instance is not found. */
-  NO_SUCH_INSTANCE(false, true);
+  NO_SUCH_INSTANCE(true, true);
 
   public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
       Stream.of(FragmentInstanceState.values())
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 29dbd7e3492..28f9753e87a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -605,8 +605,12 @@ public class QueryExecution implements IQueryExecution {
 
     // If RETRYING is triggered by this QueryExecution, the 
stateMachine.getFailureStatus() is also
     // not null. We should only return the failure status when QueryExecution 
is in Done state.
-    if (state.isDone() && stateMachine.getFailureStatus() != null) {
-      tsstatus = stateMachine.getFailureStatus();
+    if (state.isDone()) {
+      if (analysis.getFailStatus() != null) {
+        tsstatus = analysis.getFailStatus();
+      } else if (stateMachine.getFailureStatus() != null) {
+        tsstatus = stateMachine.getFailureStatus();
+      }
     }
 
     // collect redirect info to client for writing
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
index e18c628e421..b848db6b6d9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceInfo;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.utils.SetThreadName;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -45,13 +46,15 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState.NO_SUCH_INSTANCE;
+
 public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
 
   private static final Logger logger = 
LoggerFactory.getLogger(FixedRateFragInsStateTracker.class);
 
   private static final long SAME_STATE_PRINT_RATE_IN_MS = 10L * 60 * 1000;
 
-  // TODO: (xingtanzjr) consider how much Interval is OK for state tracker
+  // consider how much Interval is OK for state tracker
   private static final long STATE_FETCH_INTERVAL_IN_MS = 500;
   private ScheduledFuture<?> trackTask;
   private final Map<FragmentInstanceId, InstanceStateMetrics> instanceStateMap;
@@ -112,8 +115,8 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
     aborted = true;
     if (trackTask != null) {
       boolean cancelResult = trackTask.cancel(true);
-      // TODO: (xingtanzjr) a strange case here is that sometimes
-      // the cancelResult is false but the trackTask is definitely cancelled
+      // a strange case here is that sometimes the cancelResult is false but 
the trackTask is
+      // definitely cancelled
       if (!cancelResult) {
         logger.debug("cancel state tracking task failed. {}", 
trackTask.isCancelled());
       }
@@ -144,8 +147,24 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
             updateQueryState(instance.getId(), instanceInfo);
           }
         } catch (ClientManagerException | TException e) {
-          // TODO: do nothing ?
-          logger.warn("error happened while fetching query state", e);
+          // network exception, should retry
+          InstanceStateMetrics metrics =
+              instanceStateMap.computeIfAbsent(
+                  instance.getId(), k -> new 
InstanceStateMetrics(instance.isRoot()));
+          if (metrics.reachMaxRetryCount()) {
+            // if reach max retry count, we think that the DN is down, and FI 
in that node won't
+            // exist
+            FragmentInstanceInfo instanceInfo = new 
FragmentInstanceInfo(NO_SUCH_INSTANCE);
+            instanceInfo.setMessage(
+                String.format(
+                    "Failed to fetch state, has retried %s times",
+                    InstanceStateMetrics.MAX_STATE_FETCH_RETRY_COUNT));
+            updateQueryState(instance.getId(), instanceInfo);
+          } else {
+            // if not reaching max retry count, add retry count, and wait for 
next fetching schedule
+            metrics.addRetryCount();
+            logger.warn("error happened while fetching query state", e);
+          }
         }
       }
     }
@@ -153,44 +172,46 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
 
   private void updateQueryState(FragmentInstanceId instanceId, 
FragmentInstanceInfo instanceInfo) {
     // no such instance may be caused by DN restarting
-    if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
+    if (instanceInfo.getState() == NO_SUCH_INSTANCE) {
       stateMachine.transitionToFailed(
-          new RuntimeException(
+          new IoTDBException(
               String.format(
                   "FragmentInstance[%s] is failed. %s, may be caused by DN 
restarting.",
-                  instanceId, instanceInfo.getMessage())));
-    }
-    if (instanceInfo.getState().isFailed()) {
-      if (instanceInfo.getFailureInfoList() == null
+                  instanceId, instanceInfo.getMessage()),
+              TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode(),
+              true));
+    } else if (instanceInfo.getState().isFailed()) {
+      if (instanceInfo.getErrorCode().isPresent()) {
+        stateMachine.transitionToFailed(
+            new IoTDBException(
+                instanceInfo.getErrorCode().get().getMessage(),
+                instanceInfo.getErrorCode().get().getCode()));
+      } else if (instanceInfo.getFailureInfoList() == null
           || instanceInfo.getFailureInfoList().isEmpty()) {
         stateMachine.transitionToFailed(
             new RuntimeException(
                 String.format(
                     "FragmentInstance[%s] is failed. %s", instanceId, 
instanceInfo.getMessage())));
-      } else if (instanceInfo.getErrorCode().isPresent()) {
-        stateMachine.transitionToFailed(
-            new IoTDBException(
-                instanceInfo.getErrorCode().get().getMessage(),
-                instanceInfo.getErrorCode().get().getCode()));
       } else {
         
stateMachine.transitionToFailed(instanceInfo.getFailureInfoList().get(0).toException());
       }
-    }
-    boolean queryFinished = false;
-    List<InstanceStateMetrics> rootInstanceStateMetricsList =
-        instanceStateMap.values().stream()
-            .filter(instanceStateMetrics -> 
instanceStateMetrics.isRootInstance)
-            .collect(Collectors.toList());
-    if (!rootInstanceStateMetricsList.isEmpty()) {
-      queryFinished =
-          rootInstanceStateMetricsList.stream()
-              .allMatch(
-                  instanceStateMetrics ->
-                      instanceStateMetrics.lastState == 
FragmentInstanceState.FINISHED);
-    }
+    } else {
+      boolean queryFinished = false;
+      List<InstanceStateMetrics> rootInstanceStateMetricsList =
+          instanceStateMap.values().stream()
+              .filter(instanceStateMetrics -> 
instanceStateMetrics.isRootInstance)
+              .collect(Collectors.toList());
+      if (!rootInstanceStateMetricsList.isEmpty()) {
+        queryFinished =
+            rootInstanceStateMetricsList.stream()
+                .allMatch(
+                    instanceStateMetrics ->
+                        instanceStateMetrics.lastState == 
FragmentInstanceState.FINISHED);
+      }
 
-    if (queryFinished) {
-      stateMachine.transitionToFinished();
+      if (queryFinished) {
+        stateMachine.transitionToFinished();
+      }
     }
   }
 
@@ -203,23 +224,39 @@ public class FixedRateFragInsStateTracker extends 
AbstractFragInsStateTracker {
   }
 
   private static class InstanceStateMetrics {
+    private static final long MAX_STATE_FETCH_RETRY_COUNT = 5;
     private final boolean isRootInstance;
     private FragmentInstanceState lastState;
     private long durationToLastPrintInMS;
+    // we only record the continuous retry count
+    private int retryCount;
 
     private InstanceStateMetrics(boolean isRootInstance) {
       this.isRootInstance = isRootInstance;
       this.lastState = null;
       this.durationToLastPrintInMS = 0L;
+      this.retryCount = 0;
     }
 
     private void reset(FragmentInstanceState newState) {
       this.lastState = newState;
       this.durationToLastPrintInMS = 0L;
+      // each successful fetch, we need to reset the retry count
+      this.retryCount = 0;
+    }
+
+    private void addRetryCount() {
+      this.retryCount++;
+    }
+
+    private boolean reachMaxRetryCount() {
+      return retryCount >= MAX_STATE_FETCH_RETRY_COUNT;
     }
 
     private void addDuration(long duration) {
       durationToLastPrintInMS += duration;
+      // each successful fetch, we need to reset the retry count
+      this.retryCount = 0;
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 679288e7840..9fd7053ba92 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -104,8 +104,15 @@ public class ErrorHandlingUtils {
         if (status.getCode() == TSStatusCode.SQL_PARSE_ERROR.getStatusCode()
             || status.getCode() == TSStatusCode.SEMANTIC_ERROR.getStatusCode()
             || status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()
-            || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()) {
-          LOGGER.warn(message);
+            || status.getCode() == TSStatusCode.ILLEGAL_PATH.getStatusCode()
+            || status.getCode() == 
TSStatusCode.NUMERIC_VALUE_OUT_OF_RANGE.getStatusCode()
+            || status.getCode() == 
TSStatusCode.DIVISION_BY_ZERO.getStatusCode()
+            || status.getCode() == 
TSStatusCode.DATE_OUT_OF_RANGE.getStatusCode()
+            || status.getCode() == 
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
+            || status.getCode() == 
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()
+            || status.getCode() == 
TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode()
+            || status.getCode() == 
TSStatusCode.CANNOT_FETCH_FI_STATE.getStatusCode()) {
+          LOGGER.info(message);
         } else {
           LOGGER.warn(message, e);
         }

Reply via email to