This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch tryFixExceptionHandle in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4c2752c93d4b7162de8c205fbb9ddc84e5c9bfb7 Author: JackieTien97 <[email protected]> AuthorDate: Wed Oct 9 11:50:26 2024 +0800 Try to fix error msg like: 301: queue has been destroyed --- .../queryengine/execution/exchange/SharedTsBlockQueue.java | 10 ++++++++++ .../execution/exchange/source/LocalSourceHandle.java | 12 ++++++++++++ 2 files changed, 22 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java index 6c556c1ae55..b3b94f66282 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java @@ -38,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import static com.google.common.util.concurrent.Futures.immediateVoidFuture; @@ -177,6 +178,15 @@ public class SharedTsBlockQueue { */ public TsBlock remove() { if (closed) { + // try throw underlying exception instead of "Source handle is aborted." + try { + blocked.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause() == null ? e : e.getCause()); + } throw new IllegalStateException("queue has been destroyed"); } TsBlock tsBlock = queue.remove(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java index 315e75cdfc5..4da89f72d07 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; +import java.util.concurrent.ExecutionException; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom; @@ -254,6 +255,17 @@ public class LocalSourceHandle implements ISourceHandle { private void checkState() { if (aborted) { + if (queue.isBlocked().isDone()) { + // try throw underlying exception instead of "Source handle is aborted." + try { + queue.isBlocked().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (ExecutionException e) { + throw new IllegalStateException(e.getCause() == null ? e : e.getCause()); + } + } throw new IllegalStateException("Source handle is aborted."); } else if (closed) { throw new IllegalStateException("Source Handle is closed.");
