This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/ThrowTimeout in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94f2f23c4b4f02acb4a2a708307aa97ad326ae41 Author: JackieTien97 <[email protected]> AuthorDate: Tue Jun 17 10:05:55 2025 +0800 Fix timeout didn't pass to client bug --- .../execution/exchange/source/LocalSourceHandle.java | 13 +++++++++---- .../execution/fragment/FragmentInstanceManager.java | 8 +++++--- .../iotdb/commons/exception/QueryTimeoutException.java | 4 ++++ 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java index 325ffb20bd0..0c2d20d46ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java @@ -158,6 +158,7 @@ public class LocalSourceHandle implements ISourceHandle { @Override public boolean isFinished() { synchronized (queue) { + checkSharedQueueIfAborted(); return queue.hasNoMoreTsBlocks() && queue.isEmpty(); } } @@ -254,10 +255,7 @@ public class LocalSourceHandle implements ISourceHandle { private void checkState() { if (aborted || closed) { - Optional<Throwable> abortedCause = queue.getAbortedCause(); - if (abortedCause.isPresent()) { - throw new IllegalStateException(abortedCause.get()); - } + checkSharedQueueIfAborted(); if (queue.isBlocked().isDone()) { // try throw underlying exception instead of "Source handle is aborted." try { @@ -274,6 +272,13 @@ public class LocalSourceHandle implements ISourceHandle { } } + private void checkSharedQueueIfAborted() { + Optional<Throwable> abortedCause = queue.getAbortedCause(); + if (abortedCause.isPresent()) { + throw new IllegalStateException(abortedCause.get()); + } + } + public SharedTsBlockQueue getSharedTsBlockQueue() { return queue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index f1eed1d1f4e..c8fded6e2a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; +import org.apache.iotdb.commons.exception.QueryTimeoutException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.QueryId; @@ -55,7 +56,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import static java.util.Objects.requireNonNull; @@ -412,8 +412,10 @@ public class FragmentInstanceManager { execution .getStateMachine() .failed( - new TimeoutException( - "Query has executed more than " + execution.getTimeoutInMs() + "ms")); + new QueryTimeoutException( + "Query has executed more than " + + execution.getTimeoutInMs() + + "ms, and now is in flushing state")); } }); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java index 61569a83826..ed8cce83fbd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/QueryTimeoutException.java @@ -27,4 +27,8 @@ public class QueryTimeoutException extends IoTDBRuntimeException { public QueryTimeoutException() { super("Query execution is time out", QUERY_TIMEOUT.getStatusCode(), true); } + + public QueryTimeoutException(String message) { + super(message, QUERY_TIMEOUT.getStatusCode(), true); + } }
