This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ThrowTimeout
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 94f2f23c4b4f02acb4a2a708307aa97ad326ae41
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 17 10:05:55 2025 +0800

    Fix timeout didn't pass to client bug
---
 .../execution/exchange/source/LocalSourceHandle.java        | 13 +++++++++----
 .../execution/fragment/FragmentInstanceManager.java         |  8 +++++---
 .../iotdb/commons/exception/QueryTimeoutException.java      |  4 ++++
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 325ffb20bd0..0c2d20d46ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -158,6 +158,7 @@ public class LocalSourceHandle implements ISourceHandle {
   @Override
   public boolean isFinished() {
     synchronized (queue) {
+      checkSharedQueueIfAborted();
       return queue.hasNoMoreTsBlocks() && queue.isEmpty();
     }
   }
@@ -254,10 +255,7 @@ public class LocalSourceHandle implements ISourceHandle {
 
   private void checkState() {
     if (aborted || closed) {
-      Optional<Throwable> abortedCause = queue.getAbortedCause();
-      if (abortedCause.isPresent()) {
-        throw new IllegalStateException(abortedCause.get());
-      }
+      checkSharedQueueIfAborted();
       if (queue.isBlocked().isDone()) {
         // try throw underlying exception instead of "Source handle is 
aborted."
         try {
@@ -274,6 +272,13 @@ public class LocalSourceHandle implements ISourceHandle {
     }
   }
 
+  private void checkSharedQueueIfAborted() {
+    Optional<Throwable> abortedCause = queue.getAbortedCause();
+    if (abortedCause.isPresent()) {
+      throw new IllegalStateException(abortedCause.get());
+    }
+  }
+
   public SharedTsBlockQueue getSharedTsBlockQueue() {
     return queue;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index f1eed1d1f4e..c8fded6e2a8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.exception.QueryTimeoutException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -55,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Objects.requireNonNull;
@@ -412,8 +412,10 @@ public class FragmentInstanceManager {
             execution
                 .getStateMachine()
                 .failed(
-                    new TimeoutException(
-                        "Query has executed more than " + 
execution.getTimeoutInMs() + "ms"));
+                    new QueryTimeoutException(
+                        "Query has executed more than "
+                            + execution.getTimeoutInMs()
+                            + "ms, and now is in flushing state"));
           }
         });
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java
index 61569a83826..ed8cce83fbd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java
@@ -27,4 +27,8 @@ public class QueryTimeoutException extends 
IoTDBRuntimeException {
   public QueryTimeoutException() {
     super("Query execution is time out", QUERY_TIMEOUT.getStatusCode(), true);
   }
+
+  public QueryTimeoutException(String message) {
+    super(message, QUERY_TIMEOUT.getStatusCode(), true);
+  }
 }

Reply via email to