This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ffa4b67deb Correct status code from 301 to 719 while memory not 
enough & Record release resource blocked too long by driver  (#15828)
0ffa4b67deb is described below

commit 0ffa4b67deb7ede43a93d3250a0acbf7306b654a
Author: Jackie Tien <[email protected]>
AuthorDate: Thu Jun 26 17:44:08 2025 +0800

    Correct status code from 301 to 719 while memory not enough & Record 
release resource blocked too long by driver  (#15828)
---
 .../queryengine/execution/QueryStateMachine.java   | 58 ++++++++++++----------
 .../fragment/FragmentInstanceContext.java          |  7 +++
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  7 +--
 3 files changed, 41 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
index a6e3acfb44d..cc0f787b014 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.rpc.RpcUtils;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
@@ -50,8 +51,8 @@ import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
 public class QueryStateMachine {
   private final StateMachine<QueryState> queryState;
 
-  private Throwable failureException;
-  private TSStatus failureStatus;
+  private final AtomicReference<Throwable> failureException = new 
AtomicReference<>();
+  private final AtomicReference<TSStatus> failureStatus = new 
AtomicReference<>();
 
   public QueryStateMachine(QueryId queryId, ExecutorService executor) {
     this.queryState =
@@ -74,6 +75,8 @@ public class QueryStateMachine {
 
   public void transitionToQueued() {
     queryState.set(QUEUED);
+    failureException.set(null);
+    failureStatus.set(null);
   }
 
   public void transitionToPlanned() {
@@ -85,7 +88,7 @@ public class QueryStateMachine {
   }
 
   public void transitionToPendingRetry(TSStatus failureStatus) {
-    this.failureStatus = failureStatus;
+    this.failureStatus.compareAndSet(null, failureStatus);
     queryState.setIf(PENDING_RETRY, currentState -> currentState == 
DISPATCHING);
   }
 
@@ -105,10 +108,9 @@ public class QueryStateMachine {
   }
 
   public void transitionToCanceled(Throwable throwable, TSStatus 
failureStatus) {
-    if (transitionToDoneState(CANCELED)) {
-      this.failureException = throwable;
-      this.failureStatus = failureStatus;
-    }
+    this.failureStatus.compareAndSet(null, failureStatus);
+    this.failureException.compareAndSet(null, throwable);
+    transitionToDoneState(CANCELED);
   }
 
   public void transitionToAborted() {
@@ -120,15 +122,13 @@ public class QueryStateMachine {
   }
 
   public void transitionToFailed(Throwable throwable) {
-    if (transitionToDoneState(FAILED)) {
-      this.failureException = throwable;
-    }
+    this.failureException.compareAndSet(null, throwable);
+    transitionToDoneState(FAILED);
   }
 
   public void transitionToFailed(TSStatus failureStatus) {
-    if (transitionToDoneState(FAILED)) {
-      this.failureStatus = failureStatus;
-    }
+    this.failureStatus.compareAndSet(null, failureStatus);
+    transitionToDoneState(FAILED);
   }
 
   private boolean transitionToDoneState(QueryState doneState) {
@@ -139,31 +139,37 @@ public class QueryStateMachine {
   }
 
   public String getFailureMessage() {
-    if (failureException != null) {
-      return failureException.getMessage();
+    Throwable throwable = failureException.get();
+    if (throwable != null) {
+      return throwable.getMessage();
     }
     return "no detailed failure reason in QueryStateMachine";
   }
 
   public Throwable getFailureException() {
-    if (failureException == null) {
+    Throwable throwable = failureException.get();
+    if (throwable == null) {
       return new IoTDBException(getFailureStatus().getMessage(), 
getFailureStatus().code);
     } else {
-      return failureException;
+      return throwable;
     }
   }
 
   public TSStatus getFailureStatus() {
-    if (failureStatus != null) {
-      return failureStatus;
-    } else if (failureException != null) {
-      Throwable t = getRootCause(failureException);
-      if (t instanceof IoTDBRuntimeException) {
-        return RpcUtils.getStatus(((IoTDBRuntimeException) t).getErrorCode(), 
t.getMessage());
-      } else if (t instanceof IoTDBException) {
-        return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
t.getMessage());
+    TSStatus status = failureStatus.get();
+    if (status != null) {
+      return status;
+    } else {
+      Throwable throwable = failureException.get();
+      if (throwable != null) {
+        Throwable t = getRootCause(throwable);
+        if (t instanceof IoTDBRuntimeException) {
+          return RpcUtils.getStatus(((IoTDBRuntimeException) 
t).getErrorCode(), t.getMessage());
+        } else if (t instanceof IoTDBException) {
+          return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
t.getMessage());
+        }
       }
+      return failureStatus.get();
     }
-    return failureStatus;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index 55df223db17..8b9a2a274a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -78,6 +78,8 @@ public class FragmentInstanceContext extends QueryContext {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceContext.class);
   private static final long END_TIME_INITIAL_VALUE = -1L;
+  // wait over 5s for driver to close is abnormal
+  private static final long LONG_WAIT_DURATION = 5_000_000_000L;
   private final FragmentInstanceId id;
 
   private final FragmentInstanceStateMachine stateMachine;
@@ -708,6 +710,7 @@ public class FragmentInstanceContext extends QueryContext {
 
   @SuppressWarnings("squid:S2142")
   public void releaseResourceWhenAllDriversAreClosed() {
+    long startTime = System.nanoTime();
     while (true) {
       try {
         allDriversClosed.await();
@@ -718,6 +721,10 @@ public class FragmentInstanceContext extends QueryContext {
             "Interrupted when await on allDriversClosed, FragmentInstance Id 
is {}", this.getId());
       }
     }
+    long duration = System.nanoTime() - startTime;
+    if (duration >= LONG_WAIT_DURATION) {
+      LOGGER.warn("Wait {}ms for all Drivers closed", duration / 1_000_000);
+    }
     releaseResource();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index a756a26df7f..a4f21d2f20f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -471,8 +471,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
               } else if (sendFragmentInstanceResp.status.getCode()
                   == TSStatusCode.CONSENSUS_GROUP_NOT_EXIST.getStatusCode()) {
                 throw new 
ConsensusGroupNotExistException(sendFragmentInstanceResp.message);
-              } else if (sendFragmentInstanceResp.status.getCode()
-                  == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+              } else {
                 throw new 
FragmentInstanceDispatchException(sendFragmentInstanceResp.status);
               }
             } else if (sendFragmentInstanceResp.status != null) {
@@ -610,9 +609,7 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
         if (!executionResult.isAccepted()) {
           LOGGER.warn(executionResult.getMessage());
           if (executionResult.isReadNeedRetry()) {
-            if (executionResult.getStatus() != null
-                && executionResult.getStatus().getCode()
-                    == 
TSStatusCode.TOO_MANY_CONCURRENT_QUERIES_ERROR.getStatusCode()) {
+            if (executionResult.getStatus() != null) {
               throw new 
FragmentInstanceDispatchException(executionResult.getStatus());
             }
             throw new FragmentInstanceDispatchException(

Reply via email to