This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch tryFixExceptionHandle
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4c2752c93d4b7162de8c205fbb9ddc84e5c9bfb7
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Oct 9 11:50:26 2024 +0800

    Try to fix error msg like: 301: queue has been destroyed
---
 .../queryengine/execution/exchange/SharedTsBlockQueue.java   | 10 ++++++++++
 .../execution/exchange/source/LocalSourceHandle.java         | 12 ++++++++++++
 2 files changed, 22 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
index 6c556c1ae55..b3b94f66282 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/SharedTsBlockQueue.java
@@ -38,6 +38,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.util.LinkedList;
 import java.util.Queue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
@@ -177,6 +178,15 @@ public class SharedTsBlockQueue {
    */
   public TsBlock remove() {
     if (closed) {
+      // try throw underlying exception instead of "Source handle is aborted."
+      try {
+        blocked.get();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(e);
+      } catch (ExecutionException e) {
+        throw new IllegalStateException(e.getCause() == null ? e : 
e.getCause());
+      }
       throw new IllegalStateException("queue has been destroyed");
     }
     TsBlock tsBlock = queue.remove();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
index 315e75cdfc5..4da89f72d07 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/source/LocalSourceHandle.java
@@ -36,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
 
 import static 
com.google.common.util.concurrent.Futures.nonCancellationPropagating;
 import static 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager.createFullIdFrom;
@@ -254,6 +255,17 @@ public class LocalSourceHandle implements ISourceHandle {
 
   private void checkState() {
     if (aborted) {
+      if (queue.isBlocked().isDone()) {
+        // try throw underlying exception instead of "Source handle is 
aborted."
+        try {
+          queue.isBlocked().get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        } catch (ExecutionException e) {
+          throw new IllegalStateException(e.getCause() == null ? e : 
e.getCause());
+        }
+      }
       throw new IllegalStateException("Source handle is aborted.");
     } else if (closed) {
       throw new IllegalStateException("Source Handle is closed.");

Reply via email to