This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/ThrowTimeout-1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 734c662960ee768500e400210512dd93564d6f03
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Jun 17 12:03:25 2025 +0800

    Fix timeout didn't pass to client bug
    
    (cherry picked from commit 232e0c9054f3660b888c921b3b7b256469aad516)
---
 .../query/QueryTimeoutRuntimeException.java        |  4 +++
 .../execution/exchange/SharedTsBlockQueue.java     | 21 ++++++++++++++++
 .../exchange/source/LocalSourceHandle.java         | 29 +++++++++++++++++++---
 .../fragment/FragmentInstanceManager.java          |  8 +++---
 4 files changed, 55 insertions(+), 7 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
index ecabba0542f..dec3081ec7a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/query/QueryTimeoutRuntimeException.java
@@ -29,4 +29,8 @@ public class QueryTimeoutRuntimeException extends 
RuntimeException {
         String.format(
             QUERY_TIMEOUT_EXCEPTION_MESSAGE, startTime, startTime + timeout, 
currentTime));
   }
+
+  public QueryTimeoutRuntimeException(String message) {
+    super(message);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 154ffa9c714..dcd062052b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -81,6 +83,8 @@ public class SharedTsBlockQueue {
   private long maxBytesCanReserve =
       
IoTDBDescriptor.getInstance().getConfig().getMaxBytesPerFragmentInstance();
 
+  private volatile Throwable abortedCause = null;
+
   // used for SharedTsBlockQueue listener
   private final ExecutorService executorService;
 
@@ -177,6 +181,18 @@ public class SharedTsBlockQueue {
    */
   public TsBlock remove() {
     if (closed) {
+      // try throw underlying exception instead of "Source handle is aborted."
+      if (abortedCause != null) {
+        throw new IllegalStateException(abortedCause);
+      }
+      try {
+        blocked.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException(e.getCause() == null ? e : 
e.getCause());
+      }
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
@@ -332,6 +348,7 @@ public class SharedTsBlockQueue {
     if (closed) {
       return;
     }
+    abortedCause = t;
     closed = true;
     if (!blocked.isDone()) {
       blocked.setException(t);
@@ -354,4 +371,8 @@ public class SharedTsBlockQueue {
       bufferRetainedSizeInBytes = 0;
     }
   }
+
+  public Optional<Throwable> getAbortedCause() {
+    return Optional.ofNullable(abortedCause);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index c31f9d655de..0c2d20d46ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -36,6 +36,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
 
 import static 
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -156,6 +158,7 @@ public class LocalSourceHandle implements ISourceHandle {
   @Override
   public boolean isFinished() {
     synchronized (queue) {
+      checkSharedQueueIfAborted();
       return queue.hasNoMoreTsBlocks() && queue.isEmpty();
     }
   }
@@ -251,10 +254,28 @@ public class LocalSourceHandle implements ISourceHandle {
   }
 
   private void checkState() {
-    if (aborted) {
-      throw new IllegalStateException("Source handle is aborted.");
-    } else if (closed) {
-      throw new IllegalStateException("Source Handle is closed.");
+    if (aborted || closed) {
+      checkSharedQueueIfAborted();
+      if (queue.isBlocked().isDone()) {
+        // try throw underlying exception instead of "Source handle is 
aborted."
+        try {
+          queue.isBlocked().get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        } catch (ExecutionException e) {
+          throw new IllegalStateException(e.getCause() == null ? e : 
e.getCause());
+        }
+      }
+      throw new IllegalStateException(
+          "LocalSinkChannel state is ." + (aborted ? "ABORTED" : "CLOSED"));
+    }
+  }
+
+  private void checkSharedQueueIfAborted() {
+    Optional<Throwable> abortedCause = queue.getAbortedCause();
+    if (abortedCause.isPresent()) {
+      throw new IllegalStateException(abortedCause.get());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
index a82e5a6d92e..5cb43d86bc0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.execution.driver.IDriver;
@@ -56,7 +57,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Objects.requireNonNull;
@@ -421,8 +421,10 @@ public class FragmentInstanceManager {
             execution
                 .getStateMachine()
                 .failed(
-                    new TimeoutException(
-                        "Query has executed more than " + 
execution.getTimeoutInMs() + "ms"));
+                    new QueryTimeoutRuntimeException(
+                        "Query has executed more than "
+                            + execution.getTimeoutInMs()
+                            + "ms, and now is in flushing state"));
           }
         });
   }

Reply via email to